diff options
| author | Per Larsson <[email protected]> | 2021-12-09 17:01:57 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-12-09 17:01:57 +0100 |
| commit | 20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6 (patch) | |
| tree | fbb0f274840a12c32d93c7e342c0427f2617a651 | |
| parent | Disabled cache tracker. (diff) | |
| parent | Return status_code as ErrorCode from jupiter api if not successful (diff) | |
| download | zen-20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6.tar.xz zen-20f3c16b0012cfb8ce7bf9b6dd06a2720b6885c6.zip | |
Merged main.
| -rw-r--r-- | .gitignore | 3 | ||||
| -rw-r--r-- | zencore/include/zencore/iobuffer.h | 47 | ||||
| -rw-r--r-- | zencore/iobuffer.cpp | 55 | ||||
| -rw-r--r-- | zenhttp/httpasio.cpp | 99 | ||||
| -rw-r--r-- | zenhttp/httpserver.cpp | 81 | ||||
| -rw-r--r-- | zenhttp/httpsys.cpp | 167 | ||||
| -rw-r--r-- | zenhttp/httpsys.h | 20 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpserver.h | 2 | ||||
| -rw-r--r-- | zenserver/compute/apply.cpp | 31 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 2 | ||||
| -rw-r--r-- | zenserver/config.cpp | 33 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 72 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 94 | ||||
| -rw-r--r-- | zenserver/xmake.lua | 1 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenutil/zenserverprocess.cpp | 24 |
16 files changed, 494 insertions, 239 deletions
diff --git a/.gitignore b/.gitignore index de0d3019b..41757ed52 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,8 @@ build/ [Bb]in/ [Oo]bj/ [Ll]og/ +vsxmake20*/ +vs20*/ # Visual Studio Code options directory .vscode/ @@ -213,6 +215,7 @@ __pycache__/ .minio_data/ .test/ .xmake/ +.gdb_history # Tags TAGS diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index 04b3b33dd..9d18a55e9 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -2,6 +2,7 @@ #pragma once +#include <atomic> #include <memory.h> #include <zencore/memory.h> #include "refcount.h" @@ -111,39 +112,39 @@ public: inline void EnsureDataValid() const { - if ((m_Flags & kIsExtended) && !(m_Flags & kIsMaterialized)) + const uint32_t LocalFlags = m_Flags.load(std::memory_order_acquire); + if ((LocalFlags & kIsExtended) && !(LocalFlags & kIsMaterialized)) { Materialize(); } } - inline bool IsOwnedByThis() const { return !!(m_Flags & kIsOwnedByThis); } + inline bool IsOwnedByThis() const { return !!(m_Flags.load(std::memory_order_relaxed) & kIsOwnedByThis); } inline void SetIsOwnedByThis(bool NewState) { if (NewState) { - m_Flags |= kIsOwnedByThis; + m_Flags.fetch_or(kIsOwnedByThis, std::memory_order_relaxed); } else { - m_Flags &= ~kIsOwnedByThis; + m_Flags.fetch_and(~kIsOwnedByThis, std::memory_order_relaxed); } } inline bool IsOwned() const { - bool ThisIsOwned = !!(m_Flags & kIsOwnedByThis); - if (ThisIsOwned) + if (IsOwnedByThis()) { return true; } return m_OuterCore && m_OuterCore->IsOwned(); } - inline bool IsImmutable() const { return (m_Flags & kIsMutable) == 0; } - inline bool IsWholeFile() const { return (m_Flags & kIsWholeFile) != 0; } - inline bool IsNull() const { return (m_Flags & kIsNull) != 0; } + inline bool IsImmutable() const { return (m_Flags.load(std::memory_order_relaxed) & kIsMutable) == 0; } + inline bool IsWholeFile() const { return (m_Flags.load(std::memory_order_relaxed) & kIsWholeFile) != 0; } + inline bool IsNull() const { return (m_Flags.load(std::memory_order_relaxed) & kIsNull) != 0; } inline IoBufferExtendedCore* ExtendedCore(); inline const IoBufferExtendedCore* ExtendedCore() const; @@ -168,11 +169,11 @@ public: { if (!NewState) { - m_Flags |= kIsMutable; + m_Flags.fetch_or(kIsMutable, std::memory_order_relaxed); } else { - m_Flags &= ~kIsMutable; + m_Flags.fetch_and(~kIsMutable, std::memory_order_relaxed); } } @@ -180,27 +181,35 @@ public: { if (NewState) { - m_Flags |= kIsWholeFile; + m_Flags.fetch_or(kIsWholeFile, std::memory_order_relaxed); } else { - m_Flags |= ~kIsWholeFile; + m_Flags.fetch_and(~kIsWholeFile, std::memory_order_relaxed); } } inline void SetContentType(ZenContentType ContentType) { ZEN_ASSERT_SLOW((uint32_t(ContentType) & kContentTypeMask) == uint32_t(ContentType)); - m_Flags = (m_Flags & ~(kContentTypeMask << kContentTypeShift)) | (uint32_t(ContentType) << kContentTypeShift); + uint32_t OldValue = m_Flags.load(std::memory_order_relaxed); + uint32_t NewValue; + do + { + NewValue = (OldValue & ~(kContentTypeMask << kContentTypeShift)) | (uint32_t(ContentType) << kContentTypeShift); + } while (!m_Flags.compare_exchange_weak(OldValue, NewValue, std::memory_order_relaxed, std::memory_order_relaxed)); } - inline ZenContentType GetContentType() const { return ZenContentType((m_Flags >> kContentTypeShift) & kContentTypeMask); } + inline ZenContentType GetContentType() const + { + return ZenContentType((m_Flags.load(std::memory_order_relaxed) >> kContentTypeShift) & kContentTypeMask); + } inline uint32_t GetRefCount() const { return m_RefCount; } protected: uint32_t m_RefCount = 0; - mutable uint32_t m_Flags = 0; + mutable std::atomic<uint32_t> m_Flags{0}; mutable const void* m_DataPtr = nullptr; size_t m_DataBytes = 0; RefPtr<const IoBufferCore> m_OuterCore; @@ -213,7 +222,7 @@ protected: static_assert((uint32_t(ZenContentType::kUnknownContentType) & ~kContentTypeMask) == 0); - enum Flags + enum Flags : uint32_t { kIsNull = 1 << 0, // This is a null IoBuffer kIsMutable = 1 << 1, @@ -266,7 +275,7 @@ private: inline IoBufferExtendedCore* IoBufferCore::ExtendedCore() { - if (m_Flags & kIsExtended) + if (m_Flags.load(std::memory_order_relaxed) & kIsExtended) { return static_cast<IoBufferExtendedCore*>(this); } @@ -277,7 +286,7 @@ IoBufferCore::ExtendedCore() inline const IoBufferExtendedCore* IoBufferCore::ExtendedCore() const { - if (m_Flags & kIsExtended) + if (m_Flags.load(std::memory_order_relaxed) & kIsExtended) { return static_cast<const IoBufferExtendedCore*>(this); } diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index 6cee3f60d..e2c112a79 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -39,7 +39,7 @@ IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment) #if ZEN_PLATFORM_WINDOWS if (((InSize & 0xffFF) == 0) && (Alignment == 0x10000)) { - m_Flags |= kLowLevelAlloc; + m_Flags.fetch_or(kLowLevelAlloc, std::memory_order_relaxed); m_DataPtr = VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE); return; @@ -48,7 +48,7 @@ IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment) #if ZEN_USE_MIMALLOC void* Ptr = mi_aligned_alloc(Alignment, RoundUp(InSize, Alignment)); - m_Flags |= kIoBufferAlloc; + m_Flags.fetch_or(kIoBufferAlloc, std::memory_order_relaxed); #else void* Ptr = Memory::Alloc(InSize, Alignment); #endif @@ -66,8 +66,9 @@ IoBufferCore::FreeBuffer() return; } + const uint32_t LocalFlags = m_Flags.load(std::memory_order_relaxed); #if ZEN_PLATFORM_WINDOWS - if (m_Flags & kLowLevelAlloc) + if (LocalFlags & kLowLevelAlloc) { VirtualFree(const_cast<void*>(m_DataPtr), 0, MEM_DECOMMIT); @@ -76,7 +77,7 @@ IoBufferCore::FreeBuffer() #endif // ZEN_PLATFORM_WINDOWS #if ZEN_USE_MIMALLOC - if (m_Flags & kIoBufferAlloc) + if (LocalFlags & kIoBufferAlloc) { return mi_free(const_cast<void*>(m_DataPtr)); } @@ -170,12 +171,13 @@ IoBufferExtendedCore::IoBufferExtendedCore(void* FileHandle, uint64_t Offset, ui , m_FileHandle(FileHandle) , m_FileOffset(Offset) { - m_Flags |= kIsOwnedByThis | kIsExtended; + uint32_t NewFlags = kIsOwnedByThis | kIsExtended; if (TransferHandleOwnership) { - m_Flags |= kOwnsFile; + NewFlags |= kOwnsFile; } + m_Flags.fetch_or(NewFlags, std::memory_order_relaxed); } IoBufferExtendedCore::IoBufferExtendedCore(const IoBufferExtendedCore* Outer, uint64_t Offset, uint64_t Size) @@ -183,7 +185,7 @@ IoBufferExtendedCore::IoBufferExtendedCore(const IoBufferExtendedCore* Outer, ui , m_FileHandle(Outer->m_FileHandle) , m_FileOffset(Outer->m_FileOffset + Offset) { - m_Flags |= kIsOwnedByThis | kIsExtended; + m_Flags.fetch_or(kIsOwnedByThis | kIsExtended, std::memory_order_relaxed); } IoBufferExtendedCore::~IoBufferExtendedCore() @@ -198,14 +200,15 @@ IoBufferExtendedCore::~IoBufferExtendedCore() #endif } + const uint32_t LocalFlags = m_Flags.load(std::memory_order_relaxed); #if ZEN_PLATFORM_WINDOWS - if (m_Flags & kOwnsMmap) + if (LocalFlags & kOwnsMmap) { CloseHandle(m_MmapHandle); } #endif - if (m_Flags & kOwnsFile) + if (LocalFlags & kOwnsFile) { #if ZEN_PLATFORM_WINDOWS BOOL Success = CloseHandle(m_FileHandle); @@ -233,43 +236,46 @@ IoBufferExtendedCore::Materialize() const // The synchronization scheme here is very primitive, if we end up with // a lot of contention we can make it more fine-grained - if (m_MmapHandle) + if (m_Flags.load(std::memory_order_acquire) & kIsMaterialized) return; RwLock::ExclusiveLockScope _(g_MappingLock); // Someone could have gotten here first - if (m_MmapHandle) + // We can use memory_order_relaxed on this load because the mutex has already provided the fence + if (m_Flags.load(std::memory_order_relaxed) & kIsMaterialized) return; + uint32_t NewFlags = kIsMaterialized; + const uint64_t MapOffset = m_FileOffset & ~0xffffull; const uint64_t MappedOffsetDisplacement = m_FileOffset - MapOffset; const uint64_t MapSize = m_DataBytes + MappedOffsetDisplacement; #if ZEN_PLATFORM_WINDOWS - m_MmapHandle = CreateFileMapping(m_FileHandle, - /* lpFileMappingAttributes */ nullptr, - /* flProtect */ PAGE_READONLY, - /* dwMaximumSizeLow */ 0, - /* dwMaximumSizeHigh */ 0, - /* lpName */ nullptr); - - if (m_MmapHandle == nullptr) + void* NewMmapHandle = CreateFileMapping(m_FileHandle, + /* lpFileMappingAttributes */ nullptr, + /* flProtect */ PAGE_READONLY, + /* dwMaximumSizeLow */ 0, + /* dwMaximumSizeHigh */ 0, + /* lpName */ nullptr); + + if (NewMmapHandle == nullptr) { throw std::system_error(std::error_code(::GetLastError(), std::system_category()), "CreateFileMapping failed on file '{}'"_format(zen::PathFromHandle(m_FileHandle))); } - m_Flags |= kOwnsMmap; + NewFlags |= kOwnsMmap; - void* MappedBase = MapViewOfFile(m_MmapHandle, + void* MappedBase = MapViewOfFile(NewMmapHandle, /* dwDesiredAccess */ FILE_MAP_READ, /* FileOffsetHigh */ uint32_t(MapOffset >> 32), /* FileOffsetLow */ uint32_t(MapOffset & 0xffFFffFFu), /* dwNumberOfBytesToMap */ MapSize); #else - m_MmapHandle = (void*)uintptr_t(~MapSize); // ~ so it's never null (assuming MapSize >= 0) - m_Flags |= kOwnsMmap; + NewMmapHandle = (void*)uintptr_t(~MapSize); // ~ so it's never null (assuming MapSize >= 0) + NewFlags |= kOwnsMmap; void* MappedBase = mmap( /* addr */ nullptr, @@ -289,8 +295,9 @@ IoBufferExtendedCore::Materialize() const m_MappedPointer = MappedBase; m_DataPtr = reinterpret_cast<uint8_t*>(MappedBase) + MappedOffsetDisplacement; + m_MmapHandle = NewMmapHandle; - m_Flags |= kIsMaterialized; + m_Flags.fetch_or(NewFlags, std::memory_order_release); } bool diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index d5fe9adbb..7ee7193d1 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -15,6 +15,14 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <asio.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#define ASIO_VERBOSE_TRACE 0 + +#if ASIO_VERBOSE_TRACE +#define ZEN_TRACE_VERBOSE ZEN_TRACE +#else +#define ZEN_TRACE_VERBOSE(fmtstr, ...) +#endif + namespace zen::asio_http { using namespace std::literals; @@ -114,7 +122,7 @@ struct HttpRequest HttpVerb RequestVerb() const { return m_RequestVerb; } bool IsKeepAlive() const { return m_KeepAlive; } - std::string_view Url() const { return std::string_view(m_Url, m_UrlLength); } + std::string_view Url() const { return m_NormalizedUrl.empty() ? std::string_view(m_Url, m_UrlLength) : m_NormalizedUrl; } std::string_view QueryString() const { return std::string_view(m_QueryString, m_QueryLength); } IoBuffer Body() { return m_BodyBuffer; } @@ -171,6 +179,7 @@ private: uint64_t m_BodyPosition = 0; http_parser m_Parser; char m_HeaderBuffer[1024]; + std::string m_NormalizedUrl; void AppendInputBytes(const char* Data, size_t Bytes); void AppendCurrentHeader(); @@ -318,6 +327,7 @@ private: std::unique_ptr<asio::ip::tcp::socket> m_Socket; std::atomic<uint32_t> m_RequestCounter{0}; uint32_t m_ConnectionId = 0; + Ref<IHttpPackageHandler> m_PackageHandler; RwLock m_ResponsesLock; std::deque<std::unique_ptr<HttpResponse>> m_Responses; @@ -330,12 +340,12 @@ HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::uniq , m_Socket(std::move(Socket)) , m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) { - ZEN_TRACE("new connection #{}", m_ConnectionId); + ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId); } HttpServerConnection::~HttpServerConnection() { - ZEN_TRACE("destroying connection #{}", m_ConnectionId); + ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId); } void @@ -371,18 +381,18 @@ HttpServerConnection::EnqueueRead() asio::async_read(*m_Socket.get(), m_RequestBuffer, - asio::transfer_at_least(16), + asio::transfer_at_least(1), [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnDataReceived(Ec, ByteCount); }); } void -HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount) +HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount) { if (Ec) { if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kInitialRead) { - ZEN_TRACE("on data received ERROR (EXPECTED), connection '{}' reason '{}'", m_ConnectionId, Ec.message()); + ZEN_TRACE_VERBOSE("on data received ERROR (EXPECTED), connection '{}' reason '{}'", m_ConnectionId, Ec.message()); return; } else @@ -392,7 +402,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t Byt } } - ZEN_TRACE("on data received, connection '{}', request '{}', thread '{}', bytes '{}'", + ZEN_TRACE_VERBOSE("on data received, connection '{}', request '{}', thread '{}', bytes '{}'", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed), GetCurrentThreadId(), @@ -421,7 +431,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, std::size_t Byt } void -HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop) +HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop) { if (Ec) { @@ -430,7 +440,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, std::size_t } else { - ZEN_TRACE("on data sent, connection '{}', request '{}', thread '{}', bytes '{}'", + ZEN_TRACE_VERBOSE("on data sent, connection '{}', request '{}', thread '{}', bytes '{}'", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed), GetCurrentThreadId(), @@ -485,9 +495,23 @@ HttpServerConnection::HandleRequest() { HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body()); - ZEN_TRACE("handle request, connection '{}' request '{}'", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed)); - Service->HandleRequest(Request); + ZEN_TRACE_VERBOSE("handle request, connection '{}' request '{}'", m_ConnectionId, m_RequestCounter.load(std::memory_order_relaxed)); + + if (!HandlePackageOffers(*Service, Request, m_PackageHandler)) + { + try + { + Service->HandleRequest(Request); + } + catch (std::exception& ex) + { + ZEN_ERROR("Caught exception while handling request: '{}'", ex.what()); + + Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, ex.what()); + } + + } if (std::unique_ptr<HttpResponse> Response = std::move(Request.m_Response)) { @@ -737,6 +761,37 @@ HttpRequest::TerminateConnection() m_Connection.TerminateConnection(); } +static void +NormalizeUrlPath(const char* Url, size_t UrlLength, std::string& NormalizedUrl) +{ + bool LastCharWasSeparator = false; + for (std::string_view::size_type UrlIndex = 0; UrlIndex < UrlLength; ++UrlIndex) + { + const char UrlChar = Url[UrlIndex]; + const bool IsSeparator = (UrlChar == '/'); + + if (IsSeparator && LastCharWasSeparator) + { + if (NormalizedUrl.empty()) + { + NormalizedUrl.reserve(UrlLength); + NormalizedUrl.append(Url, UrlIndex); + } + + if (!LastCharWasSeparator) + { + NormalizedUrl.push_back('/'); + } + } + else if (!NormalizedUrl.empty()) + { + NormalizedUrl.push_back(UrlChar); + } + + LastCharWasSeparator = IsSeparator; + } +} + int HttpRequest::OnHeadersComplete() { @@ -803,6 +858,9 @@ HttpRequest::OnHeadersComplete() m_QueryLength = Url.size() - QuerySplit - 1; } + NormalizeUrlPath(m_Url, m_UrlLength, m_NormalizedUrl); + + return 0; } @@ -843,6 +901,7 @@ HttpRequest::ResetState() m_BodyBuffer = {}; m_BodyPosition = 0; m_Headers.clear(); + m_NormalizedUrl.clear(); } int @@ -935,7 +994,7 @@ HttpAsioServerRequest::HttpAsioServerRequest(asio_http::HttpRequest& Request, Ht const int PrefixLength = Service.UriPrefixLength(); std::string_view Uri = Request.Url(); - Uri.remove_prefix(PrefixLength); + Uri.remove_prefix(std::min(PrefixLength, static_cast<int>(Uri.size()))); m_Uri = Uri; m_QueryString = Request.QueryString(); @@ -1099,6 +1158,10 @@ HttpAsioServerImpl::RegisterService(const char* InUrlPath, HttpService& Service) { std::string_view UrlPath(InUrlPath); Service.SetUriPrefixLength(UrlPath.size()); + if (!UrlPath.empty() && UrlPath.back() == '/') + { + UrlPath.remove_suffix(1); + } RwLock::ExclusiveLockScope _(m_Lock); m_UriHandlers.push_back({std::string(UrlPath), &Service}); @@ -1109,16 +1172,22 @@ HttpAsioServerImpl::RouteRequest(std::string_view Url) { RwLock::SharedLockScope _(m_Lock); + HttpService* CandidateService = nullptr; + std::string::size_type CandidateMatchSize = 0; for (const ServiceEntry& SvcEntry : m_UriHandlers) { const std::string& SvcUrl = SvcEntry.ServiceUrlPath; - if (Url.compare(0, SvcUrl.size(), SvcUrl) == 0) + const std::string::size_type SvcUrlSize = SvcUrl.size(); + if ((SvcUrlSize >= CandidateMatchSize) && + Url.compare(0, SvcUrlSize, SvcUrl) == 0 && + ((SvcUrlSize == Url.size()) || (Url[SvcUrlSize] == '/'))) { - return SvcEntry.Service; + CandidateMatchSize = SvcUrl.size(); + CandidateService = SvcEntry.Service; } } - return nullptr; + return CandidateService; } } // namespace zen::asio_http diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 011468715..3326f3d4a 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -7,6 +7,7 @@ #include "httpsys.h" #include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/iobuffer.h> #include <zencore/logging.h> @@ -637,6 +638,86 @@ CreateHttpServer(std::string_view ServerClass) ////////////////////////////////////////////////////////////////////////// +bool +HandlePackageOffers(HttpService& Service, HttpServerRequest& Request, Ref<IHttpPackageHandler>& PackageHandlerRef) +{ + if (Request.RequestVerb() == HttpVerb::kPost) + { + if (Request.RequestContentType() == HttpContentType::kCbPackageOffer) + { + // The client is presenting us with a package attachments offer, we need + // to filter it down to the list of attachments we need them to send in + // the follow-up request + + PackageHandlerRef = Service.HandlePackageRequest(Request); + + if (PackageHandlerRef) + { + CbObject OfferMessage = LoadCompactBinaryObject(Request.ReadPayload()); + + std::vector<IoHash> OfferCids; + + for (auto& CidEntry : OfferMessage["offer"]) + { + if (!CidEntry.IsHash()) + { + // Should yield bad request response? + + ZEN_WARN("found invalid entry in offer"); + + continue; + } + + OfferCids.push_back(CidEntry.AsHash()); + } + + ZEN_TRACE("request #{} -> filtering offer of {} entries", Request.RequestId(), OfferCids.size()); + + PackageHandlerRef->FilterOffer(OfferCids); + + ZEN_TRACE("request #{} -> filtered to {} entries", Request.RequestId(), OfferCids.size()); + + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + for (const IoHash& Cid : OfferCids) + { + ResponseWriter.AddHash(Cid); + } + + ResponseWriter.EndArray(); + + // Emit filter response + Request.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); + return true; + } + } + else if (Request.RequestContentType() == HttpContentType::kCbPackage) + { + // Process chunks in package request + + PackageHandlerRef = Service.HandlePackageRequest(Request); + + // TODO: this should really be done in a streaming fashion, currently this emulates + // the intended flow from an API perspective + + if (PackageHandlerRef) + { + PackageHandlerRef->OnRequestBegin(); + + auto CreateBuffer = [&](const IoHash& Cid, uint64_t Size) -> IoBuffer { return PackageHandlerRef->CreateTarget(Cid, Size); }; + + CbPackage Package = ParsePackageMessage(Request.ReadPayload(), CreateBuffer); + + PackageHandlerRef->OnRequestComplete(); + } + } + } + return false; +} + +////////////////////////////////////////////////////////////////////////// + #if ZEN_WITH_TESTS TEST_CASE("http.common") diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index cdf9e0a39..e9472e3b8 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -748,15 +748,20 @@ HttpSysServer::~HttpSysServer() } void -HttpSysServer::Initialize(const wchar_t* UrlPath) +HttpSysServer::InitializeServer(int BasePort) { + using namespace std::literals; + + WideStringBuilder<64> WildcardUrlPath; + WildcardUrlPath << u8"http://*:"sv << int64_t(BasePort) << u8"/"sv; + m_IsOk = false; ULONG Result = HttpCreateServerSession(HTTPAPI_VERSION_2, &m_HttpSessionId, 0); if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(UrlPath), Result); + ZEN_ERROR("Failed to create server session for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); return; } @@ -765,18 +770,44 @@ HttpSysServer::Initialize(const wchar_t* UrlPath) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(UrlPath), Result); + ZEN_ERROR("Failed to create URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); return; } - m_BaseUri = UrlPath; + Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, WildcardUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0); - Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, UrlPath, HTTP_URL_CONTEXT(0), 0); + m_BaseUris.clear(); + if (Result == NO_ERROR) + { + m_BaseUris.push_back(WildcardUrlPath.c_str()); + } + else + { + // If we can't register the wildcard path, we fall back to local paths + // This local paths allow requests originating locally to function, but will not allow + // remote origin requests to function. This can be remedied by using netsh + // during an install process to grant permissions to route public access to the appropriate + // port for the current user. eg: + // netsh http add urlacl url=http://*:1337/ user=<some_user> - if (Result != NO_ERROR) + const std::u8string_view Hosts[] = { u8"[::1]"sv, u8"localhost"sv, u8"127.0.0.1"sv }; + + for (const std::u8string_view Host : Hosts) + { + WideStringBuilder<64> LocalUrlPath; + LocalUrlPath << u8"http://"sv << Host << u8":"sv << int64_t(BasePort) << u8"/"sv; + + if (HttpAddUrlToUrlGroup(m_HttpUrlGroupId, LocalUrlPath.c_str(), HTTP_URL_CONTEXT(0), 0) == NO_ERROR) + { + m_BaseUris.push_back(LocalUrlPath.c_str()); + } + } + } + + if (m_BaseUris.empty()) { - ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(UrlPath), Result); + ZEN_ERROR("Failed to add base URL to URL group for '{}': {:#x}", WideToUtf8(WildcardUrlPath), Result); return; } @@ -791,7 +822,7 @@ HttpSysServer::Initialize(const wchar_t* UrlPath) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(UrlPath), Result); + ZEN_ERROR("Failed to create request queue for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); return; } @@ -803,7 +834,7 @@ HttpSysServer::Initialize(const wchar_t* UrlPath) if (Result != NO_ERROR) { - ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(UrlPath), Result); + ZEN_ERROR("Failed to set server binding property for '{}': {:#x}", WideToUtf8(m_BaseUris.front()), Result); return; } @@ -815,13 +846,13 @@ HttpSysServer::Initialize(const wchar_t* UrlPath) if (ErrorCode) { - ZEN_ERROR("Failed to create IOCP for '{}': {}", WideToUtf8(UrlPath), ErrorCode.message()); + ZEN_ERROR("Failed to create IOCP for '{}': {}", WideToUtf8(m_BaseUris.front()), ErrorCode.message()); } else { m_IsOk = true; - ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(UrlPath)); + ZEN_INFO("Started http.sys server at '{}'", WideToUtf8(m_BaseUris.front())); } } @@ -952,15 +983,18 @@ HttpSysServer::RegisterService(const char* UrlPath, HttpService& Service) // Convert to wide string - std::wstring Url16 = m_BaseUri + PathUtf16; + for (const std::wstring& BaseUri : m_BaseUris) + { + std::wstring Url16 = BaseUri + PathUtf16; - ULONG Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, Url16.c_str(), HTTP_URL_CONTEXT(&Service), 0 /* Reserved */); + ULONG Result = HttpAddUrlToUrlGroup(m_HttpUrlGroupId, Url16.c_str(), HTTP_URL_CONTEXT(&Service), 0 /* Reserved */); - if (Result != NO_ERROR) - { - ZEN_ERROR("HttpAddUrlToUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result)); + if (Result != NO_ERROR) + { + ZEN_ERROR("HttpAddUrlToUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result)); - return; + return; + } } } @@ -978,13 +1012,16 @@ HttpSysServer::UnregisterService(const char* UrlPath, HttpService& Service) // Convert to wide string - std::wstring Url16 = m_BaseUri + PathUtf16; + for (const std::wstring& BaseUri : m_BaseUris) + { + std::wstring Url16 = BaseUri + PathUtf16; - ULONG Result = HttpRemoveUrlFromUrlGroup(m_HttpUrlGroupId, Url16.c_str(), 0); + ULONG Result = HttpRemoveUrlFromUrlGroup(m_HttpUrlGroupId, Url16.c_str(), 0); - if (Result != NO_ERROR) - { - ZEN_ERROR("HttpRemoveUrlFromUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result)); + if (Result != NO_ERROR) + { + ZEN_ERROR("HttpRemoveUrlFromUrlGroup failed with result: '{}'", GetSystemErrorAsString(Result)); + } } } @@ -1131,83 +1168,12 @@ HttpSysTransaction::InvokeRequestHandler(HttpService& Service, IoBuffer Payload) { HttpSysServerRequest& ThisRequest = m_HandlerRequest.emplace(*this, Service, Payload); - if (ThisRequest.RequestVerb() == HttpVerb::kPost) - { - if (ThisRequest.RequestContentType() == HttpContentType::kCbPackageOffer) - { - // The client is presenting us with a package attachments offer, we need - // to filter it down to the list of attachments we need them to send in - // the follow-up request - - m_PackageHandler = Service.HandlePackageRequest(ThisRequest); - - if (m_PackageHandler) - { - CbObject OfferMessage = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> OfferCids; - - for (auto& CidEntry : OfferMessage["offer"]) - { - if (!CidEntry.IsHash()) - { - // Should yield bad request response? - - ZEN_WARN("found invalid entry in offer"); - - continue; - } - - OfferCids.push_back(CidEntry.AsHash()); - } - - ZEN_TRACE("request #{} -> filtering offer of {} entries", ThisRequest.RequestId(), OfferCids.size()); - - m_PackageHandler->FilterOffer(OfferCids); - - ZEN_TRACE("request #{} -> filtered to {} entries", ThisRequest.RequestId(), OfferCids.size()); - - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("need"); - - for (const IoHash& Cid : OfferCids) - { - ResponseWriter.AddHash(Cid); - } - - ResponseWriter.EndArray(); - - // Emit filter response - ThisRequest.WriteResponse(HttpResponseCode::OK, ResponseWriter.Save()); - - return ThisRequest; - } - } - else if (ThisRequest.RequestContentType() == HttpContentType::kCbPackage) - { - // Process chunks in package request - - m_PackageHandler = Service.HandlePackageRequest(ThisRequest); - - // TODO: this should really be done in a streaming fashion, currently this emulates - // the intended flow from an API perspective - - if (m_PackageHandler) - { - m_PackageHandler->OnRequestBegin(); - - auto CreateBuffer = [&](const IoHash& Cid, uint64_t Size) -> IoBuffer { return m_PackageHandler->CreateTarget(Cid, Size); }; - - CbPackage Package = ParsePackageMessage(ThisRequest.ReadPayload(), CreateBuffer); - - m_PackageHandler->OnRequestComplete(); - } - } - } - // Default request handling - Service.HandleRequest(ThisRequest); + if (!HandlePackageOffers(Service, ThisRequest, m_PackageHandler)) + { + Service.HandleRequest(ThisRequest); + } return ThisRequest; } @@ -1641,12 +1607,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT void HttpSysServer::Initialize(int BasePort) { - using namespace std::literals; - - WideStringBuilder<64> BaseUri; - BaseUri << u8"http://*:"sv << int64_t(BasePort) << u8"/"sv; - - Initialize(BaseUri.c_str()); + InitializeServer(BasePort); StartServer(); } diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h index 46ba122cc..7df8fba8f 100644 --- a/zenhttp/httpsys.h +++ b/zenhttp/httpsys.h @@ -52,7 +52,7 @@ public: inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } private: - void Initialize(const wchar_t* UrlPath); + void InitializeServer(int BasePort); void Cleanup(); void StartServer(); @@ -75,15 +75,15 @@ private: WinIoThreadPool m_ThreadPool; WorkerThreadPool m_AsyncWorkPool; - std::wstring m_BaseUri; // http://*:nnnn/ - HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; - HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0; - HANDLE m_RequestQueueHandle = 0; - std::atomic_int32_t m_PendingRequests{0}; - std::atomic<int32_t> m_IsShuttingDown{0}; - int32_t m_MinPendingRequests = 16; - int32_t m_MaxPendingRequests = 128; - Event m_ShutdownEvent; + std::vector<std::wstring> m_BaseUris; // eg: http://*:nnnn/ + HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; + HTTP_URL_GROUP_ID m_HttpUrlGroupId = 0; + HANDLE m_RequestQueueHandle = 0; + std::atomic_int32_t m_PendingRequests{0}; + std::atomic_int32_t m_IsShuttingDown{0}; + int32_t m_MinPendingRequests = 16; + int32_t m_MaxPendingRequests = 128; + Event m_ShutdownEvent; }; } // namespace zen diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index 19b4c63d6..b32359d67 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -303,6 +303,8 @@ private: std::map<std::string, RpcFunction> m_Functions; }; +bool HandlePackageOffers(HttpService& Service, HttpServerRequest& Request, Ref<IHttpPackageHandler>& PackageHandlerRef); + void http_forcelink(); // internal } // namespace zen diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 1f18b054f..ae4dd4528 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -578,9 +578,13 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, if (NeedList.empty()) { // We already have everything + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); - CbObject Output = ExecActionUpstream(Worker, RequestObject); - + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -638,8 +642,13 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); - CbObject Output = ExecActionUpstream(Worker, ActionObj); + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } break; @@ -881,8 +890,8 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) return OutputPackage; } -CbObject -HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action) +HttpResponseCode +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) { const IoHash WorkerId = Worker.Descriptor.GetHash(); const IoHash ActionId = Action.GetHash(); @@ -895,14 +904,19 @@ HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Actio if (!EnqueueResult.Success) { - throw std::runtime_error("Error enqueuing upstream task"); + ZEN_ERROR( + "Error enqueuing upstream Action {}/{}", + WorkerId.ToHexString(), + ActionId.ToHexString()); + return HttpResponseCode::InternalServerError; } CbObjectWriter Writer; Writer.AddHash("worker", WorkerId); Writer.AddHash("action", ActionId); - return std::move(Writer.Save()); + Object = std::move(Writer.Save()); + return HttpResponseCode::OK; } HttpResponseCode @@ -932,8 +946,7 @@ HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHa Completed.Error.Reason, Completed.Error.ErrorCode); - throw std::runtime_error( - "Action {}/{} failed"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); + return HttpResponseCode::InternalServerError; } ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 15cda4750..0d6edf119 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -46,7 +46,7 @@ private: [[nodiscard]] std::filesystem::path CreateNewSandbox(); [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action); - [[nodiscard]] CbObject ExecActionUpstream(const WorkerDesc& Worker, CbObject Action); + [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object); [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package); RwLock m_WorkerLock; diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 8d9339e92..23450c370 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -45,6 +45,39 @@ PickDefaultStateDirectory() return myDocumentsDir; } + int CandidateDriveLetterOffset = -1; + ULARGE_INTEGER CandidateDriveSize; + CandidateDriveSize.QuadPart = 0L; + DWORD LogicalDrives = GetLogicalDrives(); + char CandidateDriveName[] = "A:\\"; + for (int DriveLetterOffset = 0; DriveLetterOffset < 32; ++DriveLetterOffset) + { + if ((LogicalDrives & (1 << DriveLetterOffset)) != 0) + { + CandidateDriveName[0] = (char)('A' + DriveLetterOffset); + if (GetDriveTypeA(CandidateDriveName) == DRIVE_FIXED) + { + ULARGE_INTEGER FreeBytesAvailableToCaller; + ULARGE_INTEGER TotalNumberOfBytes; + if (0 != GetDiskFreeSpaceExA(CandidateDriveName, &FreeBytesAvailableToCaller, &TotalNumberOfBytes, nullptr)) + { + if ((FreeBytesAvailableToCaller.QuadPart > 0) && (TotalNumberOfBytes.QuadPart > CandidateDriveSize.QuadPart)) + { + CandidateDriveLetterOffset = DriveLetterOffset; + CandidateDriveSize = TotalNumberOfBytes; + } + } + } + } + } + + if (CandidateDriveLetterOffset >= 0) + { + char RootZenFolderName[] = "A:\\zen"; + RootZenFolderName[0] = (char)('A' + CandidateDriveLetterOffset); + return RootZenFolderName; + } + return L""; } diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 1f82f4a04..f9be068ec 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -116,7 +116,11 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -161,7 +165,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -198,7 +206,11 @@ CloudCacheSession::GetBlob(const IoHash& Key) const IoBuffer Buffer = Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -234,7 +246,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -270,7 +286,11 @@ CloudCacheSession::GetObject(const IoHash& Key) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -307,9 +327,12 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -365,6 +388,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer PutRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.ErrorCode = !Result.Success ? Response.status_code : 0; Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; @@ -429,6 +453,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con FinalizeRefResult Result; Result.Success = (Response.status_code == 200 || Response.status_code == 201); + Result.ErrorCode = !Result.Success ? Response.status_code : 0; Result.Bytes = Response.uploaded_bytes; Result.ElapsedSeconds = Response.elapsed; @@ -479,9 +504,12 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -514,9 +542,12 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -549,9 +580,12 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -585,7 +619,9 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + const bool Success = Response.status_code == 200; + + return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success}; } CloudCacheResult @@ -654,7 +690,9 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; } - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + const bool Success = Response.status_code == 200; + + return {.ElapsedSeconds = Response.elapsed, .ErrorCode = !Success ? Response.status_code : 0, .Success = Success}; } CloudCacheResult @@ -690,7 +728,11 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } CloudCacheResult @@ -726,7 +768,11 @@ CloudCacheSession::GetObjectTree(const IoHash& Key) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .ErrorCode = !Success ? Response.status_code : 0, + .Success = Success}; } std::vector<IoHash> diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 0118902a6..05be5f65c 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -14,6 +14,7 @@ #include <zencore/session.h> #include <zencore/stats.h> #include <zencore/stream.h> +#include <zencore/thread.h> #include <zencore/timer.h> #include <zenstore/cas.h> @@ -36,6 +37,9 @@ namespace zen { using namespace std::literals; +static const IoBuffer EmptyBuffer; +static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer); + namespace detail { class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint @@ -108,7 +112,8 @@ namespace detail { ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } @@ -121,7 +126,8 @@ namespace detail { ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) { - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } @@ -145,7 +151,8 @@ namespace detail { m_PendingTasks.erase(UpstreamData.TaskId); } - return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } @@ -178,12 +185,12 @@ namespace detail { CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (ExistsResult.ErrorCode != 0) + if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode, - .Reason = std::move(ExistsResult.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"}; } // TODO: Batch upload missing blobs @@ -202,8 +209,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode, - .Reason = std::move(Result.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put blobs"}; } } @@ -229,12 +236,12 @@ namespace detail { CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); ElapsedSeconds += ExistsResult.ElapsedSeconds; - if (ExistsResult.ErrorCode != 0) + if (!ExistsResult.Success) { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode, - .Reason = std::move(ExistsResult.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"}; } // TODO: Batch upload missing objects @@ -253,8 +260,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = Result.ErrorCode, - .Reason = std::move(Result.Reason)}; + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put objects"}; } } @@ -323,7 +330,7 @@ namespace detail { CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; - if (UpdatesResult.ErrorCode != 0) + if (!UpdatesResult.Success) { return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, .Bytes = Bytes, @@ -456,13 +463,6 @@ namespace detail { Bytes += ObjectTreeResult.Bytes; ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (ObjectTreeResult.ErrorCode != 0) - { - return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - if (!ObjectTreeResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, @@ -509,7 +509,7 @@ namespace detail { CloudCacheResult ObjectResult = Session.GetObject(It.first); Bytes += ObjectTreeResult.Bytes; ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (ObjectTreeResult.ErrorCode != 0) + if (!ObjectTreeResult.Success) { return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, .Bytes = Bytes, @@ -533,7 +533,7 @@ namespace detail { CloudCacheResult BlobResult = Session.GetBlob(It.first); Bytes += ObjectTreeResult.Bytes; ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (BlobResult.ErrorCode != 0) + if (!BlobResult.Success) { return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, .Bytes = Bytes, @@ -588,7 +588,9 @@ namespace detail { { return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; } // Get Output directory node @@ -607,7 +609,9 @@ namespace detail { { return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; } // load build.output as CbObject @@ -682,7 +686,9 @@ namespace detail { { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; } OutputPackage.SetObject(BuildOutputObject); @@ -704,6 +710,8 @@ namespace detail { [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) { + using namespace fmt::literals; + std::string ExecutablePath; std::map<std::string, std::string> Environment; std::set<std::filesystem::path> InputFiles; @@ -735,6 +743,15 @@ namespace detail { } } + for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv]) + { + std::string_view Directory = It.AsString(); + std::string DummyFile = "{}/.zen_empty_file"_format(Directory); + InputFiles.insert(DummyFile); + Data.Blobs[EmptyBufferId] = EmptyBuffer; + InputFileHashes[DummyFile] = EmptyBufferId; + } + for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) { std::string_view Env = It.AsString(); @@ -809,18 +826,10 @@ namespace detail { bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) - std::string Condition; - if (HostPlatform == "Win64" || HostPlatform == "Windows") + std::string Condition = "Platform == '{}'"_format(HostPlatform); + if (HostPlatform == "Win64") { - Condition = "OSFamily == 'Windows' && Pool == 'Win-RemoteExec'"; - } - else if (HostPlatform == "Mac") - { - Condition = "OSFamily == 'MacOS'"; - } - else - { - Condition = "OSFamily == '{}'"_format(HostPlatform); + Condition += " && Pool == 'Win-RemoteExec'"; } std::map<std::string_view, int64_t> Resources; @@ -1199,6 +1208,8 @@ public: if (m_RunState.IsRunning) { + m_ShutdownEvent.Reset(); + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); @@ -1422,11 +1433,9 @@ private: void ProcessUpstreamUpdates() { - const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval); - for (;;) + const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval); + while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count()))) { - std::this_thread::sleep_for(UpdateSleep); - if (!m_RunState.IsRunning) { break; @@ -1489,6 +1498,8 @@ private: { if (m_RunState.Stop()) { + m_ShutdownEvent.Set(); + m_UpstreamQueue.CompleteAdding(); for (std::thread& Thread : m_UpstreamThreads) { @@ -1536,6 +1547,7 @@ private: UpstreamApplyTasks m_ApplyTasks; std::mutex m_ApplyTasksMutex; std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; + Event m_ShutdownEvent; std::vector<std::thread> m_UpstreamThreads; std::thread m_UpstreamUpdatesThread; std::thread m_EndpointMonitorThread; diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua index 600c8d053..2fe32112b 100644 --- a/zenserver/xmake.lua +++ b/zenserver/xmake.lua @@ -12,7 +12,6 @@ target("zenserver") if is_plat("windows") then add_ldflags("/subsystem:console,5.02") add_ldflags("/MANIFEST:EMBED") - add_ldflags("/MANIFESTUAC:level='requireAdministrator'") add_ldflags("/LTCG") else del_files("windows/**") diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index d1719000b..4b849eae8 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -82,7 +82,6 @@ <EnableCOMDATFolding>true</EnableCOMDATFolding> <OptimizeReferences>true</OptimizeReferences> <GenerateDebugInformation>true</GenerateDebugInformation> - <UACExecutionLevel>RequireAdministrator</UACExecutionLevel> <DelayLoadDLLs>projectedfslib.dll;shell32.dll</DelayLoadDLLs> </Link> </ItemDefinitionGroup> @@ -99,7 +98,6 @@ <Link> <SubSystem>Console</SubSystem> <GenerateDebugInformation>true</GenerateDebugInformation> - <UACExecutionLevel>RequireAdministrator</UACExecutionLevel> <DelayLoadDLLs>projectedfslib.dll;shell32.dll</DelayLoadDLLs> </Link> </ItemDefinitionGroup> diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 4098954a8..e366a917f 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -89,11 +89,19 @@ ZenServerState::Initialize() { // TODO: there's a small chance of a race here, this logic could be tightened up with a mutex to // ensure only a single process at a time creates the mapping + // TODO: the fallback to Local instead of Global has a flaw where if you start a non-elevated instance + // first then start an elevated instance second you'll have the first instance with a local + // mapping and the second instance with a global mapping. This kind of elevated/non-elevated + // shouldn't be common, but handling for it should be improved in the future. if (HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, L"Global\\ZenMap")) { m_hMapFile = hMap; } + else if (HANDLE hLocalMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, L"Local\\ZenMap")) + { + m_hMapFile = hLocalMap; + } else { // Security attributes to enable any user to access state @@ -108,6 +116,16 @@ ZenServerState::Initialize() if (hMap == NULL) { + hMap = CreateFileMapping(INVALID_HANDLE_VALUE, // use paging file + Attrs.Attributes(), // allow anyone to access + PAGE_READWRITE, // read/write access + 0, // maximum object size (high-order DWORD) + m_MaxEntryCount * sizeof(ZenServerEntry), // maximum object size (low-order DWORD) + L"Local\\ZenMap"); // name of mapping object + } + + if (hMap == NULL) + { ThrowLastError("Could not open or create file mapping object for Zen server state"); } @@ -136,6 +154,10 @@ ZenServerState::InitializeReadOnly() { m_hMapFile = hMap; } + else if (HANDLE hLocalMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, L"Local\\ZenMap")) + { + m_hMapFile = hLocalMap; + } else { return false; @@ -283,7 +305,7 @@ ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd) if (PidEntry.load(std::memory_order::memory_order_relaxed) == 0) { uint32_t Expected = 0; - if (PidEntry.compare_exchange_strong(Expected, uint16_t(PidToAdd))) + if (PidEntry.compare_exchange_strong(Expected, PidToAdd)) { // Success! return true; |