aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-12-09 17:01:57 +0100
committerPer Larsson <[email protected]>2021-12-09 17:01:57 +0100
commit20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6 (patch)
treefbb0f274840a12c32d93c7e342c0427f2617a651
parentDisabled cache tracker. (diff)
parentReturn status_code as ErrorCode from jupiter api if not successful (diff)
downloadzen-20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6.tar.xz
zen-20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6.zip
Merged main.
-rw-r--r--.gitignore3
-rw-r--r--zencore/include/zencore/iobuffer.h47
-rw-r--r--zencore/iobuffer.cpp55
-rw-r--r--zenhttp/httpasio.cpp99
-rw-r--r--zenhttp/httpserver.cpp81
-rw-r--r--zenhttp/httpsys.cpp167
-rw-r--r--zenhttp/httpsys.h20
-rw-r--r--zenhttp/include/zenhttp/httpserver.h2
-rw-r--r--zenserver/compute/apply.cpp31
-rw-r--r--zenserver/compute/apply.h2
-rw-r--r--zenserver/config.cpp33
-rw-r--r--zenserver/upstream/jupiter.cpp72
-rw-r--r--zenserver/upstream/upstreamapply.cpp94
-rw-r--r--zenserver/xmake.lua1
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenutil/zenserverprocess.cpp24
16 files changed, 494 insertions, 239 deletions
diff --git a/.gitignore b/.gitignore
index de0d3019b..41757ed52 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,6 +23,8 @@ build/
[Bb]in/
[Oo]bj/
[Ll]og/
+vsxmake20*/
+vs20*/
# Visual Studio Code options directory
.vscode/
@@ -213,6 +215,7 @@ __pycache__/
.minio_data/
.test/
.xmake/
+.gdb_history
# Tags
TAGS
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index 04b3b33dd..9d18a55e9 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -2,6 +2,7 @@
#pragma once
+#include <atomic>
#include <memory.h>
#include <zencore/memory.h>
#include "refcount.h"
@@ -111,39 +112,39 @@ public:
inline void EnsureDataValid() const
{
- if ((m_Flags & kIsExtended) && !(m_Flags & kIsMaterialized))
+ const uint32_t LocalFlags = m_Flags.load(std::memory_order_acquire);
+ if ((LocalFlags & kIsExtended) && !(LocalFlags & kIsMaterialized))
{
Materialize();
}
}
- inline bool IsOwnedByThis() const { return !!(m_Flags & kIsOwnedByThis); }
+ inline bool IsOwnedByThis() const { return !!(m_Flags.load(std::memory_order_relaxed) & kIsOwnedByThis); }
inline void SetIsOwnedByThis(bool NewState)
{
if (NewState)
{
- m_Flags |= kIsOwnedByThis;
+ m_Flags.fetch_or(kIsOwnedByThis, std::memory_order_relaxed);
}
else
{
- m_Flags &= ~kIsOwnedByThis;
+ m_Flags.fetch_and(~kIsOwnedByThis, std::memory_order_relaxed);
}
}
inline bool IsOwned() const
{
- bool ThisIsOwned = !!(m_Flags & kIsOwnedByThis);
- if (ThisIsOwned)
+ if (IsOwnedByThis())
{
return true;
}
return m_OuterCore && m_OuterCore->IsOwned();
}
- inline bool IsImmutable() const { return (m_Flags & kIsMutable) == 0; }
- inline bool IsWholeFile() const { return (m_Flags & kIsWholeFile) != 0; }
- inline bool IsNull() const { return (m_Flags & kIsNull) != 0; }
+ inline bool IsImmutable() const { return (m_Flags.load(std::memory_order_relaxed) & kIsMutable) == 0; }
+ inline bool IsWholeFile() const { return (m_Flags.load(std::memory_order_relaxed) & kIsWholeFile) != 0; }
+ inline bool IsNull() const { return (m_Flags.load(std::memory_order_relaxed) & kIsNull) != 0; }
inline IoBufferExtendedCore* ExtendedCore();
inline const IoBufferExtendedCore* ExtendedCore() const;
@@ -168,11 +169,11 @@ public:
{
if (!NewState)
{
- m_Flags |= kIsMutable;
+ m_Flags.fetch_or(kIsMutable, std::memory_order_relaxed);
}
else
{
- m_Flags &= ~kIsMutable;
+ m_Flags.fetch_and(~kIsMutable, std::memory_order_relaxed);
}
}
@@ -180,27 +181,35 @@ public:
{
if (NewState)
{
- m_Flags |= kIsWholeFile;
+ m_Flags.fetch_or(kIsWholeFile, std::memory_order_relaxed);
}
else
{
- m_Flags |= ~kIsWholeFile;
+ m_Flags.fetch_and(~kIsWholeFile, std::memory_order_relaxed);
}
}
inline void SetContentType(ZenContentType ContentType)
{
ZEN_ASSERT_SLOW((uint32_t(ContentType) & kContentTypeMask) == uint32_t(ContentType));
- m_Flags = (m_Flags & ~(kContentTypeMask << kContentTypeShift)) | (uint32_t(ContentType) << kContentTypeShift);
+ uint32_t OldValue = m_Flags.load(std::memory_order_relaxed);
+ uint32_t NewValue;
+ do
+ {
+ NewValue = (OldValue & ~(kContentTypeMask << kContentTypeShift)) | (uint32_t(ContentType) << kContentTypeShift);
+ } while (!m_Flags.compare_exchange_weak(OldValue, NewValue, std::memory_order_relaxed, std::memory_order_relaxed));
}
- inline ZenContentType GetContentType() const { return ZenContentType((m_Flags >> kContentTypeShift) & kContentTypeMask); }
+ inline ZenContentType GetContentType() const
+ {
+ return ZenContentType((m_Flags.load(std::memory_order_relaxed) >> kContentTypeShift) & kContentTypeMask);
+ }
inline uint32_t GetRefCount() const { return m_RefCount; }
protected:
uint32_t m_RefCount = 0;
- mutable uint32_t m_Flags = 0;
+ mutable std::atomic<uint32_t> m_Flags{0};
mutable const void* m_DataPtr = nullptr;
size_t m_DataBytes = 0;
RefPtr<const IoBufferCore> m_OuterCore;
@@ -213,7 +222,7 @@ protected:
static_assert((uint32_t(ZenContentType::kUnknownContentType) & ~kContentTypeMask) == 0);
- enum Flags
+ enum Flags : uint32_t
{
kIsNull = 1 << 0, // This is a null IoBuffer
kIsMutable = 1 << 1,
@@ -266,7 +275,7 @@ private:
inline IoBufferExtendedCore*
IoBufferCore::ExtendedCore()
{
- if (m_Flags & kIsExtended)
+ if (m_Flags.load(std::memory_order_relaxed) & kIsExtended)
{
return static_cast<IoBufferExtendedCore*>(this);
}
@@ -277,7 +286,7 @@ IoBufferCore::ExtendedCore()
inline const IoBufferExtendedCore*
IoBufferCore::ExtendedCore() const
{
- if (m_Flags & kIsExtended)
+ if (m_Flags.load(std::memory_order_relaxed) & kIsExtended)
{
return static_cast<const IoBufferExtendedCore*>(this);
}
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index 6cee3f60d..e2c112a79 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -39,7 +39,7 @@ IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment)
#if ZEN_PLATFORM_WINDOWS
if (((InSize & 0xffFF) == 0) && (Alignment == 0x10000))
{
- m_Flags |= kLowLevelAlloc;
+ m_Flags.fetch_or(kLowLevelAlloc, std::memory_order_relaxed);
m_DataPtr = VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE);
return;
@@ -48,7 +48,7 @@ IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment)
#if ZEN_USE_MIMALLOC
void* Ptr = mi_aligned_alloc(Alignment, RoundUp(InSize, Alignment));
- m_Flags |= kIoBufferAlloc;
+ m_Flags.fetch_or(kIoBufferAlloc, std::memory_order_relaxed);
#else
void* Ptr = Memory::Alloc(InSize, Alignment);
#endif
@@ -66,8 +66,9 @@ IoBufferCore::FreeBuffer()
return;
}
+ const uint32_t LocalFlags = m_Flags.load(std::memory_order_relaxed);
#if ZEN_PLATFORM_WINDOWS
- if (m_Flags & kLowLevelAlloc)
+ if (LocalFlags & kLowLevelAlloc)
{
VirtualFree(const_cast<void*>(m_DataPtr), 0, MEM_DECOMMIT);
@@ -76,7 +77,7 @@ IoBufferCore::FreeBuffer()
#endif // ZEN_PLATFORM_WINDOWS
#if ZEN_USE_MIMALLOC
- if (m_Flags & kIoBufferAlloc)
+ if (LocalFlags & kIoBufferAlloc)
{
return mi_free(const_cast<void*>(m_DataPtr));
}
@@ -170,12 +171,13 @@ IoBufferExtendedCore::IoBufferExtendedCore(void* FileHandle, uint64_t Offset, ui
, m_FileHandle(FileHandle)
, m_FileOffset(Offset)
{
- m_Flags |= kIsOwnedByThis | kIsExtended;
+ uint32_t NewFlags = kIsOwnedByThis | kIsExtended;
if (TransferHandleOwnership)
{
- m_Flags |= kOwnsFile;
+ NewFlags |= kOwnsFile;
}
+ m_Flags.fetch_or(NewFlags, std::memory_order_relaxed);
}
IoBufferExtendedCore::IoBufferExtendedCore(const IoBufferExtendedCore* Outer, uint64_t Offset, uint64_t Size)
@@ -183,7 +185,7 @@ IoBufferExtendedCore::IoBufferExtendedCore(const IoBufferExtendedCore* Outer, ui
, m_FileHandle(Outer->m_FileHandle)
, m_FileOffset(Outer->m_FileOffset + Offset)
{
- m_Flags |= kIsOwnedByThis | kIsExtended;
+ m_Flags.fetch_or(kIsOwnedByThis | kIsExtended, std::memory_order_relaxed);
}
IoBufferExtendedCore::~IoBufferExtendedCore()
@@ -198,14 +200,15 @@ IoBufferExtendedCore::~IoBufferExtendedCore()
#endif
}
+ const uint32_t LocalFlags = m_Flags.load(std::memory_order_relaxed);
#if ZEN_PLATFORM_WINDOWS
- if (m_Flags & kOwnsMmap)
+ if (LocalFlags & kOwnsMmap)
{
CloseHandle(m_MmapHandle);
}
#endif
- if (m_Flags & kOwnsFile)
+ if (LocalFlags & kOwnsFile)
{
#if ZEN_PLATFORM_WINDOWS
BOOL Success = CloseHandle(m_FileHandle);
@@ -233,43 +236,46 @@ IoBufferExtendedCore::Materialize() const
// The synchronization scheme here is very primitive, if we end up with
// a lot of contention we can make it more fine-grained
- if (m_MmapHandle)
+ if (m_Flags.load(std::memory_order_acquire) & kIsMaterialized)
return;
RwLock::ExclusiveLockScope _(g_MappingLock);
// Someone could have gotten here first
- if (m_MmapHandle)
+ // We can use memory_order_relaxed on this load because the mutex has already provided the fence
+ if (m_Flags.load(std::memory_order_relaxed) & kIsMaterialized)
return;
+ uint32_t NewFlags = kIsMaterialized;
+
const uint64_t MapOffset = m_FileOffset & ~0xffffull;
const uint64_t MappedOffsetDisplacement = m_FileOffset - MapOffset;
const uint64_t MapSize = m_DataBytes + MappedOffsetDisplacement;
#if ZEN_PLATFORM_WINDOWS
- m_MmapHandle = CreateFileMapping(m_FileHandle,
- /* lpFileMappingAttributes */ nullptr,
- /* flProtect */ PAGE_READONLY,
- /* dwMaximumSizeLow */ 0,
- /* dwMaximumSizeHigh */ 0,
- /* lpName */ nullptr);
-
- if (m_MmapHandle == nullptr)
+ void* NewMmapHandle = CreateFileMapping(m_FileHandle,
+ /* lpFileMappingAttributes */ nullptr,
+ /* flProtect */ PAGE_READONLY,
+ /* dwMaximumSizeLow */ 0,
+ /* dwMaximumSizeHigh */ 0,
+ /* lpName */ nullptr);
+
+ if (NewMmapHandle == nullptr)
{
throw std::system_error(std::error_code(::GetLastError(), std::system_category()),
"CreateFileMapping failed on file '{}'"_format(zen::PathFromHandle(m_FileHandle)));
}
- m_Flags |= kOwnsMmap;
+ NewFlags |= kOwnsMmap;
- void* MappedBase = MapViewOfFile(m_MmapHandle,
+ void* MappedBase = MapViewOfFile(NewMmapHandle,
/* dwDesiredAccess */ FILE_MAP_READ,
/* FileOffsetHigh */ uint32_t(MapOffset >> 32),
/* FileOffsetLow */ uint32_t(MapOffset & 0xffFFffFFu),
/* dwNumberOfBytesToMap */ MapSize);
#else
- m_MmapHandle = (void*)uintptr_t(~MapSize); // ~ so it's never null (assuming MapSize >= 0)
- m_Flags |= kOwnsMmap;
+ NewMmapHandle = (void*)uintptr_t(~MapSize); // ~ so it's never null (assuming MapSize >= 0)
+ NewFlags |= kOwnsMmap;
void* MappedBase = mmap(
/* addr */ nullptr,
@@ -289,8 +295,9 @@ IoBufferExtendedCore::Materialize() const
m_MappedPointer = MappedBase;
m_DataPtr = reinterpret_cast<uint8_t*>(MappedBase) + MappedOffsetDisplacement;
+ m_MmapHandle = NewMmapHandle;
- m_Flags |= kIsMaterialized;
+ m_Flags.fetch_or(NewFlags, std::memory_order_release);
}
bool
diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp
index d5fe9adbb..7ee7193d1 100644
--- a/zenhttp/httpasio.cpp
+++ b/zenhttp/httpasio.cpp
@@ -15,6 +15,14 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <asio.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+#define ASIO_VERBOSE_TRACE 0
+
+#if ASIO_VERBOSE_TRACE
+#define ZEN_TRACE_VERBOSE ZEN_TRACE
+#else
+#define ZEN_TRACE_VERBOSE(fmtstr, ...)
+#endif
+
namespace zen::asio_http {
using namespace std::literals;
@@ -114,7 +122,7 @@ struct HttpRequest
HttpVerb RequestVerb() const { return m_RequestVerb; }
bool IsKeepAlive() const { return m_KeepAlive; }
- std::string_view Url() const { return std::string_view(m_Url, m_UrlLength); }
+ std::string_view Url() const { return m_NormalizedUrl.empty() ? std::string_view(m_Url, m_UrlLength) : m_NormalizedUrl; }
std::string_view QueryString() const { return std::string_view(m_QueryString, m_QueryLength); }
IoBuffer Body() { return m_BodyBuffer; }
@@ -171,6 +179,7 @@ private:
uint64_t m_BodyPosition = 0;
http_parser m_Parser;
char m_HeaderBuffer[1024];
+ std::string m_NormalizedUrl;
void AppendInputBytes(const char* Data, size_t Bytes);
void AppendCurrentHeader();
@@ -318,6 +327,7 @@ private:
std::unique_ptr<asio::ip::tcp::socket> m_Socket;
std::atomic<uint32_t> m_RequestCounter{0};
uint32_t m_ConnectionId = 0;
+ Ref<IHttpPackageHandler> m_PackageHandler;
RwLock m_ResponsesLock;
std::deque<std::unique_ptr<HttpResponse>> m_Responses;
@@ -330,12 +340,12 @@ HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::uniq
, m_Socket(std::move(Socket))
, m_ConnectionId(g_ConnectionIdCounter.fetch_add(1))
{
- ZEN_TRACE("new connection #{}", m_ConnectionId);
+ ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId);
}
HttpServerConnection::~HttpServerConnection()
{
- ZEN_TRACE("destroying connection #{}", m_ConnectionId);
+ ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId);
}
void
@@ -371,18 +381,18 @@ HttpServerConnection::EnqueueRead()
asio::async_read(*m_Socket.get(),
m_RequestBuffer,
- asio::transfer_at_least(16),
+ asio::transfer_at_least(1),
[Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); });
}
void
-HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount)
+HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount)
{
if (Ec)
{
if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kInitialRead)
{
- ZEN_TRACE("on data received ERROR (EXPECTED), connection '{}' reason '{}'", m_ConnectionId, Ec.message());
+ ZEN_TRACE_VERBOSE("on data received ERROR (EXPECTED), connection '{}' reason '{}'", m_ConnectionId, Ec.message());
return;
}
else
@@ -392,7 +402,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t Byt
}
}
- ZEN_TRACE("on data received, connection '{}', request '{}', thread '{}', bytes '{}'",
+ ZEN_TRACE_VERBOSE("on data received, connection '{}', request '{}', thread '{}', bytes '{}'",
m_ConnectionId,
m_RequestCounter.load(std::memory_order_relaxed),
GetCurrentThreadId(),
@@ -421,7 +431,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t Byt
}
void
-HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop)
+HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop)
{
if (Ec)
{
@@ -430,7 +440,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t
}
else
{
- ZEN_TRACE("on data sent, connection '{}', request '{}', thread '{}', bytes '{}'",
+ ZEN_TRACE_VERBOSE("on data sent, connection '{}', request '{}', thread '{}', bytes '{}'",
m_ConnectionId,
m_RequestCounter.load(std::memory_order_relaxed),
GetCurrentThreadId(),
@@ -485,9 +495,23 @@ HttpServerConnection::HandleRequest()
{
HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body());
- ZEN_TRACE("handle request, connection '{}' request '{}'", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed));
- Service->HandleRequest(Request);
+ ZEN_TRACE_VERBOSE("handle request, connection '{}' request '{}'", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed));
+
+ if (!HandlePackageOffers(*Service, Request, m_PackageHandler))
+ {
+ try
+ {
+ Service->HandleRequest(Request);
+ }
+ catch (std::exception& ex)
+ {
+ ZEN_ERROR("Caught exception while handling request: '{}'", ex.what());
+
+ Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, ex.what());
+ }
+
+ }
if (std::unique_ptr<HttpResponse> Response = std::move(Request.m_Response))
{
@@ -737,6 +761,37 @@ HttpRequest::TerminateConnection()
m_Connection.TerminateConnection();
}
+static void
+NormalizeUrlPath(const char* Url, size_t UrlLength, std::string& NormalizedUrl)
+{
+ bool LastCharWasSeparator = false;
+ for (std::string_view::size_type UrlIndex = 0; UrlIndex < UrlLength; ++UrlIndex)
+ {
+ const char UrlChar = Url[UrlIndex];
+ const bool IsSeparator = (UrlChar == '/');
+
+ if (IsSeparator && LastCharWasSeparator)
+ {
+ if (NormalizedUrl.empty())
+ {
+ NormalizedUrl.reserve(UrlLength);
+ NormalizedUrl.append(Url, UrlIndex);
+ }
+
+ if (!LastCharWasSeparator)
+ {
+ NormalizedUrl.push_back('/');
+ }
+ }
+ else if (!NormalizedUrl.empty())
+ {
+ NormalizedUrl.push_back(UrlChar);
+ }
+
+ LastCharWasSeparator = IsSeparator;
+ }
+}
+
int
HttpRequest::OnHeadersComplete()
{
@@ -803,6 +858,9 @@ HttpRequest::OnHeadersComplete()
m_QueryLength = Url.size() - QuerySplit - 1;
}
+ NormalizeUrlPath(m_Url, m_UrlLength, m_NormalizedUrl);
+
+
return 0;
}
@@ -843,6 +901,7 @@ HttpRequest::ResetState()
m_BodyBuffer = {};
m_BodyPosition = 0;
m_Headers.clear();
+ m_NormalizedUrl.clear();
}
int
@@ -935,7 +994,7 @@ HttpAsioServerRequest::HttpAsioServerRequest(asio_http::HttpRequest& Request, Ht
const int PrefixLength = Service.UriPrefixLength();
std::string_view Uri = Request.Url();
- Uri.remove_prefix(PrefixLength);
+ Uri.remove_prefix(std::min(PrefixLength, static_cast<int>(Uri.size())));
m_Uri = Uri;
m_QueryString = Request.QueryString();
@@ -1099,6 +1158,10 @@ HttpAsioServerImpl::RegisterService(const char* InUrlPath, HttpService& Service)
{
std::string_view UrlPath(InUrlPath);
Service.SetUriPrefixLength(UrlPath.size());
+ if (!UrlPath.empty() && UrlPath.back() == '/')
+ {
+ UrlPath.remove_suffix(1);
+ }
RwLock::ExclusiveLockScope _(m_Lock);
m_UriHandlers.push_back({std::string(UrlPath), &Service});
@@ -1109,16 +1172,22 @@ HttpAsioServerImpl::RouteRequest(std::string_view Url)
{
RwLock::SharedLockScope _(m_Lock);
+ HttpService* CandidateService = nullptr;
+ std::string::size_type CandidateMatchSize = 0;
for (const ServiceEntry& SvcEntry : m_UriHandlers)
{
const std::string& SvcUrl = SvcEntry.ServiceUrlPath;
- if (Url.compare(0, SvcUrl.size(), SvcUrl) == 0)
+ const std::string::size_type SvcUrlSize = SvcUrl.size();
+ if ((SvcUrlSize >= CandidateMatchSize) &&
+ Url.compare(0, SvcUrlSize, SvcUrl) == 0 &&
+ ((SvcUrlSize == Url.size()) || (Url[SvcUrlSize] == '/')))
{
- return SvcEntry.Service;
+ CandidateMatchSize = SvcUrl.size();
+ CandidateService = SvcEntry.Service;
}
}
- return nullptr;
+ return CandidateService;
}
} // namespace zen::asio_http
diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp
index 011468715..3326f3d4a 100644
--- a/zenhttp/httpserver.cpp
+++ b/zenhttp/httpserver.cpp
@@ -7,6 +7,7 @@
#include "httpsys.h"
#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
@@ -637,6 +638,86 @@ CreateHttpServer(std::string_view ServerClass)
//////////////////////////////////////////////////////////////////////////
+bool
+HandlePackageOffers(HttpService& Service, HttpServerRequest& Request, Ref<IHttpPackageHandler>& PackageHandlerRef)
+{
+ if (Request.RequestVerb() == HttpVerb::kPost)
+ {
+ if (Request.RequestContentType() == HttpContentType::kCbPackageOffer)
+ {
+ // The client is presenting us with a package attachments offer, we need
+ // to filter it down to the list of attachments we need them to send in
+ // the follow-up request
+
+ PackageHandlerRef = Service.HandlePackageRequest(Request);
+
+ if (PackageHandlerRef)
+ {
+ CbObject OfferMessage = LoadCompactBinaryObject(Request.ReadPayload());
+
+ std::vector<IoHash> OfferCids;
+
+ for (auto& CidEntry : OfferMessage["offer"])
+ {
+ if (!CidEntry.IsHash())
+ {
+ // Should yield bad request response?
+
+ ZEN_WARN("found invalid entry in offer");
+
+ continue;
+ }
+
+ OfferCids.push_back(CidEntry.AsHash());
+ }
+
+ ZEN_TRACE("request #{} -> filtering offer of {} entries", Request.RequestId(), OfferCids.size());
+
+ PackageHandlerRef->FilterOffer(OfferCids);
+
+ ZEN_TRACE("request #{} -> filtered to {} entries", Request.RequestId(), OfferCids.size());
+
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+
+ for (const IoHash& Cid : OfferCids)
+ {
+ ResponseWriter.AddHash(Cid);
+ }
+
+ ResponseWriter.EndArray();
+
+ // Emit filter response
+ Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
+ return true;
+ }
+ }
+ else if (Request.RequestContentType() == HttpContentType::kCbPackage)
+ {
+ // Process chunks in package request
+
+ PackageHandlerRef = Service.HandlePackageRequest(Request);
+
+ // TODO: this should really be done in a streaming fashion, currently this emulates
+ // the intended flow from an API perspective
+
+ if (PackageHandlerRef)
+ {
+ PackageHandlerRef->OnRequestBegin();
+
+ auto CreateBuffer = [&](const IoHash& Cid, uint64_t Size) -> IoBuffer { return PackageHandlerRef->CreateTarget(Cid, Size); };
+
+ CbPackage Package = ParsePackageMessage(Request.ReadPayload(), CreateBuffer);
+
+ PackageHandlerRef->OnRequestComplete();
+ }
+ }
+ }
+ return false;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
#if ZEN_WITH_TESTS
TEST_CASE("http.common")
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index cdf9e0a39..e9472e3b8 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -748,15 +748,20 @@ HttpSysServer::~HttpSysServer()
}
void
-HttpSysServer::Initialize(const wchar_t* UrlPath)
+HttpSysServer::InitializeServer(int BasePort)
{
+ using namespace std::literals;
+
+ WideStringBuilder<64> WildcardUrlPath;
+ WildcardUrlPath << u8"http://*:"sv << int64_t(BasePort) << u8"/"sv;
+
m_IsOk = false;
ULONG Result = HttpCreateServerSession(HTTPAPI_VERSION_2, &m_HttpSessionId, 0);
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(UrlPath), Result);
+ ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
return;
}
@@ -765,18 +770,44 @@ HttpSysServer::Initialize(const wchar_t* UrlPath)
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(UrlPath), Result);
+ ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
return;
}
- m_BaseUri = UrlPath;
+ Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0);
- Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, UrlPath, HTTP_URL_CONTEXT(0), 0);
+ m_BaseUris.clear();
+ if (Result == NO_ERROR)
+ {
+ m_BaseUris.push_back(WildcardUrlPath.c_str());
+ }
+ else
+ {
+ // If we can't register the wildcard path, we fall back to local paths
+ // This local paths allow requests originating locally to function, but will not allow
+ // remote origin requests to function. This can be remedied by using netsh
+ // during an install process to grant permissions to route public access to the appropriate
+ // port for the current user. eg:
+ // netsh http add urlacl url=http://*:1337/ user=<some_user>
- if (Result != NO_ERROR)
+ const std::u8string_view Hosts[] = { u8"[::1]"sv, u8"localhost"sv, u8"127.0.0.1"sv };
+
+ for (const std::u8string_view Host : Hosts)
+ {
+ WideStringBuilder<64> LocalUrlPath;
+ LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(BasePort) << u8"/"sv;
+
+ if (HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0) == NO_ERROR)
+ {
+ m_BaseUris.push_back(LocalUrlPath.c_str());
+ }
+ }
+ }
+
+ if (m_BaseUris.empty())
{
- ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(UrlPath), Result);
+ ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result);
return;
}
@@ -791,7 +822,7 @@ HttpSysServer::Initialize(const wchar_t* UrlPath)
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(UrlPath), Result);
+ ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result);
return;
}
@@ -803,7 +834,7 @@ HttpSysServer::Initialize(const wchar_t* UrlPath)
if (Result != NO_ERROR)
{
- ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(UrlPath), Result);
+ ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result);
return;
}
@@ -815,13 +846,13 @@ HttpSysServer::Initialize(const wchar_t* UrlPath)
if (ErrorCode)
{
- ZEN_ERROR("Failed to create IOCP for '{}': {}", WideToUtf8(UrlPath), ErrorCode.message());
+ ZEN_ERROR("Failed to create IOCP for '{}': {}", WideToUtf8(m_BaseUris.front()), ErrorCode.message());
}
else
{
m_IsOk = true;
- ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(UrlPath));
+ ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(m_BaseUris.front()));
}
}
@@ -952,15 +983,18 @@ HttpSysServer::RegisterService(const char* UrlPath, HttpService& Service)
// Convert to wide string
- std::wstring Url16 = m_BaseUri + PathUtf16;
+ for (const std::wstring& BaseUri : m_BaseUris)
+ {
+ std::wstring Url16 = BaseUri + PathUtf16;
- ULONG Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, Url16.c_str(), HTTP_URL_CONTEXT(&Service), 0 /* Reserved */);
+ ULONG Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, Url16.c_str(), HTTP_URL_CONTEXT(&Service), 0 /* Reserved */);
- if (Result != NO_ERROR)
- {
- ZEN_ERROR("HttpAddUrlToUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result));
+ if (Result != NO_ERROR)
+ {
+ ZEN_ERROR("HttpAddUrlToUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result));
- return;
+ return;
+ }
}
}
@@ -978,13 +1012,16 @@ HttpSysServer::UnregisterService(const char* UrlPath, HttpService& Service)
// Convert to wide string
- std::wstring Url16 = m_BaseUri + PathUtf16;
+ for (const std::wstring& BaseUri : m_BaseUris)
+ {
+ std::wstring Url16 = BaseUri + PathUtf16;
- ULONG Result = HttpRemoveUrlFromUrlGroup(m_HttpUrlGroupId, Url16.c_str(), 0);
+ ULONG Result = HttpRemoveUrlFromUrlGroup(m_HttpUrlGroupId, Url16.c_str(), 0);
- if (Result != NO_ERROR)
- {
- ZEN_ERROR("HttpRemoveUrlFromUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result));
+ if (Result != NO_ERROR)
+ {
+ ZEN_ERROR("HttpRemoveUrlFromUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result));
+ }
}
}
@@ -1131,83 +1168,12 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload)
{
HttpSysServerRequest& ThisRequest = m_HandlerRequest.emplace(*this, Service, Payload);
- if (ThisRequest.RequestVerb() == HttpVerb::kPost)
- {
- if (ThisRequest.RequestContentType() == HttpContentType::kCbPackageOffer)
- {
- // The client is presenting us with a package attachments offer, we need
- // to filter it down to the list of attachments we need them to send in
- // the follow-up request
-
- m_PackageHandler = Service.HandlePackageRequest(ThisRequest);
-
- if (m_PackageHandler)
- {
- CbObject OfferMessage = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> OfferCids;
-
- for (auto& CidEntry : OfferMessage["offer"])
- {
- if (!CidEntry.IsHash())
- {
- // Should yield bad request response?
-
- ZEN_WARN("found invalid entry in offer");
-
- continue;
- }
-
- OfferCids.push_back(CidEntry.AsHash());
- }
-
- ZEN_TRACE("request #{} -> filtering offer of {} entries", ThisRequest.RequestId(), OfferCids.size());
-
- m_PackageHandler->FilterOffer(OfferCids);
-
- ZEN_TRACE("request #{} -> filtered to {} entries", ThisRequest.RequestId(), OfferCids.size());
-
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("need");
-
- for (const IoHash& Cid : OfferCids)
- {
- ResponseWriter.AddHash(Cid);
- }
-
- ResponseWriter.EndArray();
-
- // Emit filter response
- ThisRequest.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save());
-
- return ThisRequest;
- }
- }
- else if (ThisRequest.RequestContentType() == HttpContentType::kCbPackage)
- {
- // Process chunks in package request
-
- m_PackageHandler = Service.HandlePackageRequest(ThisRequest);
-
- // TODO: this should really be done in a streaming fashion, currently this emulates
- // the intended flow from an API perspective
-
- if (m_PackageHandler)
- {
- m_PackageHandler->OnRequestBegin();
-
- auto CreateBuffer = [&](const IoHash& Cid, uint64_t Size) -> IoBuffer { return m_PackageHandler->CreateTarget(Cid, Size); };
-
- CbPackage Package = ParsePackageMessage(ThisRequest.ReadPayload(), CreateBuffer);
-
- m_PackageHandler->OnRequestComplete();
- }
- }
- }
-
// Default request handling
- Service.HandleRequest(ThisRequest);
+ if (!HandlePackageOffers(Service, ThisRequest, m_PackageHandler))
+ {
+ Service.HandleRequest(ThisRequest);
+ }
return ThisRequest;
}
@@ -1641,12 +1607,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
void
HttpSysServer::Initialize(int BasePort)
{
- using namespace std::literals;
-
- WideStringBuilder<64> BaseUri;
- BaseUri << u8"http://*:"sv << int64_t(BasePort) << u8"/"sv;
-
- Initialize(BaseUri.c_str());
+ InitializeServer(BasePort);
StartServer();
}
diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h
index 46ba122cc..7df8fba8f 100644
--- a/zenhttp/httpsys.h
+++ b/zenhttp/httpsys.h
@@ -52,7 +52,7 @@ public:
inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; }
private:
- void Initialize(const wchar_t* UrlPath);
+ void InitializeServer(int BasePort);
void Cleanup();
void StartServer();
@@ -75,15 +75,15 @@ private:
WinIoThreadPool m_ThreadPool;
WorkerThreadPool m_AsyncWorkPool;
- std::wstring m_BaseUri; // http://*:nnnn/
- HTTP_SERVER_SESSION_ID m_HttpSessionId = 0;
- HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0;
- HANDLE m_RequestQueueHandle = 0;
- std::atomic_int32_t m_PendingRequests{0};
- std::atomic<int32_t> m_IsShuttingDown{0};
- int32_t m_MinPendingRequests = 16;
- int32_t m_MaxPendingRequests = 128;
- Event m_ShutdownEvent;
+ std::vector<std::wstring> m_BaseUris; // eg: http://*:nnnn/
+ HTTP_SERVER_SESSION_ID m_HttpSessionId = 0;
+ HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0;
+ HANDLE m_RequestQueueHandle = 0;
+ std::atomic_int32_t m_PendingRequests{0};
+ std::atomic_int32_t m_IsShuttingDown{0};
+ int32_t m_MinPendingRequests = 16;
+ int32_t m_MaxPendingRequests = 128;
+ Event m_ShutdownEvent;
};
} // namespace zen
diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h
index 19b4c63d6..b32359d67 100644
--- a/zenhttp/include/zenhttp/httpserver.h
+++ b/zenhttp/include/zenhttp/httpserver.h
@@ -303,6 +303,8 @@ private:
std::map<std::string, RpcFunction> m_Functions;
};
+bool HandlePackageOffers(HttpService& Service, HttpServerRequest& Request, Ref<IHttpPackageHandler>& PackageHandlerRef);
+
void http_forcelink(); // internal
} // namespace zen
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 1f18b054f..ae4dd4528 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -578,9 +578,13 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
if (NeedList.empty())
{
// We already have everything
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output);
- CbObject Output = ExecActionUpstream(Worker, RequestObject);
-
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
@@ -638,8 +642,13 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
zen::NiceBytes(TotalNewBytes),
NewAttachmentCount);
- CbObject Output = ExecActionUpstream(Worker, ActionObj);
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
break;
@@ -881,8 +890,8 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
return OutputPackage;
}
-CbObject
-HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action)
+HttpResponseCode
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object)
{
const IoHash WorkerId = Worker.Descriptor.GetHash();
const IoHash ActionId = Action.GetHash();
@@ -895,14 +904,19 @@ HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Actio
if (!EnqueueResult.Success)
{
- throw std::runtime_error("Error enqueuing upstream task");
+ ZEN_ERROR(
+ "Error enqueuing upstream Action {}/{}",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString());
+ return HttpResponseCode::InternalServerError;
}
CbObjectWriter Writer;
Writer.AddHash("worker", WorkerId);
Writer.AddHash("action", ActionId);
- return std::move(Writer.Save());
+ Object = std::move(Writer.Save());
+ return HttpResponseCode::OK;
}
HttpResponseCode
@@ -932,8 +946,7 @@ HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHa
Completed.Error.Reason,
Completed.Error.ErrorCode);
- throw std::runtime_error(
- "Action {}/{} failed"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str());
+ return HttpResponseCode::InternalServerError;
}
ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h
index 15cda4750..0d6edf119 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/apply.h
@@ -46,7 +46,7 @@ private:
[[nodiscard]] std::filesystem::path CreateNewSandbox();
[[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action);
- [[nodiscard]] CbObject ExecActionUpstream(const WorkerDesc& Worker, CbObject Action);
+ [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object);
[[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package);
RwLock m_WorkerLock;
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 8d9339e92..23450c370 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -45,6 +45,39 @@ PickDefaultStateDirectory()
return myDocumentsDir;
}
+ int CandidateDriveLetterOffset = -1;
+ ULARGE_INTEGER CandidateDriveSize;
+ CandidateDriveSize.QuadPart = 0L;
+ DWORD LogicalDrives = GetLogicalDrives();
+ char CandidateDriveName[] = "A:\\";
+ for (int DriveLetterOffset = 0; DriveLetterOffset < 32; ++DriveLetterOffset)
+ {
+ if ((LogicalDrives & (1 << DriveLetterOffset)) != 0)
+ {
+ CandidateDriveName[0] = (char)('A' + DriveLetterOffset);
+ if (GetDriveTypeA(CandidateDriveName) == DRIVE_FIXED)
+ {
+ ULARGE_INTEGER FreeBytesAvailableToCaller;
+ ULARGE_INTEGER TotalNumberOfBytes;
+ if (0 != GetDiskFreeSpaceExA(CandidateDriveName, &FreeBytesAvailableToCaller, &TotalNumberOfBytes, nullptr))
+ {
+ if ((FreeBytesAvailableToCaller.QuadPart > 0) && (TotalNumberOfBytes.QuadPart > CandidateDriveSize.QuadPart))
+ {
+ CandidateDriveLetterOffset = DriveLetterOffset;
+ CandidateDriveSize = TotalNumberOfBytes;
+ }
+ }
+ }
+ }
+ }
+
+ if (CandidateDriveLetterOffset >= 0)
+ {
+ char RootZenFolderName[] = "A:\\zen";
+ RootZenFolderName[0] = (char)('A' + CandidateDriveLetterOffset);
+ return RootZenFolderName;
+ }
+
return L"";
}
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 1f82f4a04..f9be068ec 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -116,7 +116,11 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -161,7 +165,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -198,7 +206,11 @@ CloudCacheSession::GetBlob(const IoHash& Key)
const IoBuffer Buffer =
Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -234,7 +246,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -270,7 +286,11 @@ CloudCacheSession::GetObject(const IoHash& Key)
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -307,9 +327,12 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
}
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -365,6 +388,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
PutRefResult Result;
Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.ErrorCode = !Result.Success ? Response.status_code : 0;
Result.Bytes = Response.uploaded_bytes;
Result.ElapsedSeconds = Response.elapsed;
@@ -429,6 +453,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con
FinalizeRefResult Result;
Result.Success = (Response.status_code == 200 || Response.status_code == 201);
+ Result.ErrorCode = !Result.Success ? Response.status_code : 0;
Result.Bytes = Response.uploaded_bytes;
Result.ElapsedSeconds = Response.elapsed;
@@ -479,9 +504,12 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob)
return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
}
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -514,9 +542,12 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
}
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -549,9 +580,12 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object)
return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
}
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -585,7 +619,9 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
}
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ const bool Success = Response.status_code == 200;
+
+ return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success};
}
CloudCacheResult
@@ -654,7 +690,9 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
}
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ const bool Success = Response.status_code == 200;
+
+ return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success};
}
CloudCacheResult
@@ -690,7 +728,11 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
CloudCacheResult
@@ -726,7 +768,11 @@ CloudCacheSession::GetObjectTree(const IoHash& Key)
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .ErrorCode = !Success ? Response.status_code : 0,
+ .Success = Success};
}
std::vector<IoHash>
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 0118902a6..05be5f65c 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -14,6 +14,7 @@
#include <zencore/session.h>
#include <zencore/stats.h>
#include <zencore/stream.h>
+#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zenstore/cas.h>
@@ -36,6 +37,9 @@ namespace zen {
using namespace std::literals;
+static const IoBuffer EmptyBuffer;
+static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer);
+
namespace detail {
class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint
@@ -108,7 +112,8 @@ namespace detail {
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"},
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds};
}
@@ -121,7 +126,8 @@ namespace detail {
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
{
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"},
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds};
}
@@ -145,7 +151,8 @@ namespace detail {
m_PendingTasks.erase(UpstreamData.TaskId);
}
- return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"},
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds};
}
@@ -178,12 +185,12 @@ namespace detail {
CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys);
ElapsedSeconds += ExistsResult.ElapsedSeconds;
- if (ExistsResult.ErrorCode != 0)
+ if (!ExistsResult.Success)
{
return {.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode,
- .Reason = std::move(ExistsResult.Reason)};
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"};
}
// TODO: Batch upload missing blobs
@@ -202,8 +209,8 @@ namespace detail {
{
return {.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = Result.ErrorCode,
- .Reason = std::move(Result.Reason)};
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put blobs"};
}
}
@@ -229,12 +236,12 @@ namespace detail {
CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys);
ElapsedSeconds += ExistsResult.ElapsedSeconds;
- if (ExistsResult.ErrorCode != 0)
+ if (!ExistsResult.Success)
{
return {.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode,
- .Reason = std::move(ExistsResult.Reason)};
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"};
}
// TODO: Batch upload missing objects
@@ -253,8 +260,8 @@ namespace detail {
{
return {.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = Result.ErrorCode,
- .Reason = std::move(Result.Reason)};
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put objects"};
}
}
@@ -323,7 +330,7 @@ namespace detail {
CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId);
Bytes += UpdatesResult.Bytes;
ElapsedSeconds += UpdatesResult.ElapsedSeconds;
- if (UpdatesResult.ErrorCode != 0)
+ if (!UpdatesResult.Success)
{
return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)},
.Bytes = Bytes,
@@ -456,13 +463,6 @@ namespace detail {
Bytes += ObjectTreeResult.Bytes;
ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
- if (ObjectTreeResult.ErrorCode != 0)
- {
- return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
-
if (!ObjectTreeResult.Success)
{
return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"},
@@ -509,7 +509,7 @@ namespace detail {
CloudCacheResult ObjectResult = Session.GetObject(It.first);
Bytes += ObjectTreeResult.Bytes;
ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
- if (ObjectTreeResult.ErrorCode != 0)
+ if (!ObjectTreeResult.Success)
{
return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)},
.Bytes = Bytes,
@@ -533,7 +533,7 @@ namespace detail {
CloudCacheResult BlobResult = Session.GetBlob(It.first);
Bytes += ObjectTreeResult.Bytes;
ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
- if (BlobResult.ErrorCode != 0)
+ if (!BlobResult.Success)
{
return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)},
.Bytes = Bytes,
@@ -588,7 +588,9 @@ namespace detail {
{
return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"},
.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
}
// Get Output directory node
@@ -607,7 +609,9 @@ namespace detail {
{
return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"},
.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
}
// load build.output as CbObject
@@ -682,7 +686,9 @@ namespace detail {
{
return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
}
OutputPackage.SetObject(BuildOutputObject);
@@ -704,6 +710,8 @@ namespace detail {
[[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data)
{
+ using namespace fmt::literals;
+
std::string ExecutablePath;
std::map<std::string, std::string> Environment;
std::set<std::filesystem::path> InputFiles;
@@ -735,6 +743,15 @@ namespace detail {
}
}
+ for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv])
+ {
+ std::string_view Directory = It.AsString();
+ std::string DummyFile = "{}/.zen_empty_file"_format(Directory);
+ InputFiles.insert(DummyFile);
+ Data.Blobs[EmptyBufferId] = EmptyBuffer;
+ InputFileHashes[DummyFile] = EmptyBufferId;
+ }
+
for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv])
{
std::string_view Env = It.AsString();
@@ -809,18 +826,10 @@ namespace detail {
bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool();
// TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac)
- std::string Condition;
- if (HostPlatform == "Win64" || HostPlatform == "Windows")
+ std::string Condition = "Platform == '{}'"_format(HostPlatform);
+ if (HostPlatform == "Win64")
{
- Condition = "OSFamily == 'Windows' && Pool == 'Win-RemoteExec'";
- }
- else if (HostPlatform == "Mac")
- {
- Condition = "OSFamily == 'MacOS'";
- }
- else
- {
- Condition = "OSFamily == '{}'"_format(HostPlatform);
+ Condition += " && Pool == 'Win-RemoteExec'";
}
std::map<std::string_view, int64_t> Resources;
@@ -1199,6 +1208,8 @@ public:
if (m_RunState.IsRunning)
{
+ m_ShutdownEvent.Reset();
+
for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
{
m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this);
@@ -1422,11 +1433,9 @@ private:
void ProcessUpstreamUpdates()
{
- const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval);
- for (;;)
+ const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval);
+ while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count())))
{
- std::this_thread::sleep_for(UpdateSleep);
-
if (!m_RunState.IsRunning)
{
break;
@@ -1489,6 +1498,8 @@ private:
{
if (m_RunState.Stop())
{
+ m_ShutdownEvent.Set();
+
m_UpstreamQueue.CompleteAdding();
for (std::thread& Thread : m_UpstreamThreads)
{
@@ -1536,6 +1547,7 @@ private:
UpstreamApplyTasks m_ApplyTasks;
std::mutex m_ApplyTasksMutex;
std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
+ Event m_ShutdownEvent;
std::vector<std::thread> m_UpstreamThreads;
std::thread m_UpstreamUpdatesThread;
std::thread m_EndpointMonitorThread;
diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua
index 600c8d053..2fe32112b 100644
--- a/zenserver/xmake.lua
+++ b/zenserver/xmake.lua
@@ -12,7 +12,6 @@ target("zenserver")
if is_plat("windows") then
add_ldflags("/subsystem:console,5.02")
add_ldflags("/MANIFEST:EMBED")
- add_ldflags("/MANIFESTUAC:level='requireAdministrator'")
add_ldflags("/LTCG")
else
del_files("windows/**")
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index d1719000b..4b849eae8 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -82,7 +82,6 @@
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
- <UACExecutionLevel>RequireAdministrator</UACExecutionLevel>
<DelayLoadDLLs>projectedfslib.dll;shell32.dll</DelayLoadDLLs>
</Link>
</ItemDefinitionGroup>
@@ -99,7 +98,6 @@
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
- <UACExecutionLevel>RequireAdministrator</UACExecutionLevel>
<DelayLoadDLLs>projectedfslib.dll;shell32.dll</DelayLoadDLLs>
</Link>
</ItemDefinitionGroup>
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index 4098954a8..e366a917f 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -89,11 +89,19 @@ ZenServerState::Initialize()
{
// TODO: there's a small chance of a race here, this logic could be tightened up with a mutex to
// ensure only a single process at a time creates the mapping
+ // TODO: the fallback to Local instead of Global has a flaw where if you start a non-elevated instance
+ // first then start an elevated instance second you'll have the first instance with a local
+ // mapping and the second instance with a global mapping. This kind of elevated/non-elevated
+ // shouldn't be common, but handling for it should be improved in the future.
if (HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, L"Global\\ZenMap"))
{
m_hMapFile = hMap;
}
+ else if (HANDLE hLocalMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, L"Local\\ZenMap"))
+ {
+ m_hMapFile = hLocalMap;
+ }
else
{
// Security attributes to enable any user to access state
@@ -108,6 +116,16 @@ ZenServerState::Initialize()
if (hMap == NULL)
{
+ hMap = CreateFileMapping(INVALID_HANDLE_VALUE, // use paging file
+ Attrs.Attributes(), // allow anyone to access
+ PAGE_READWRITE, // read/write access
+ 0, // maximum object size (high-order DWORD)
+ m_MaxEntryCount * sizeof(ZenServerEntry), // maximum object size (low-order DWORD)
+ L"Local\\ZenMap"); // name of mapping object
+ }
+
+ if (hMap == NULL)
+ {
ThrowLastError("Could not open or create file mapping object for Zen server state");
}
@@ -136,6 +154,10 @@ ZenServerState::InitializeReadOnly()
{
m_hMapFile = hMap;
}
+ else if (HANDLE hLocalMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, L"Local\\ZenMap"))
+ {
+ m_hMapFile = hLocalMap;
+ }
else
{
return false;
@@ -283,7 +305,7 @@ ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd)
if (PidEntry.load(std::memory_order::memory_order_relaxed) == 0)
{
uint32_t Expected = 0;
- if (PidEntry.compare_exchange_strong(Expected, uint16_t(PidToAdd)))
+ if (PidEntry.compare_exchange_strong(Expected, PidToAdd))
{
// Success!
return true;