// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include #endif // ZEN_WITH_TESTS ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC # include # include # include #endif static std::atomic HttpClientRequestIdCounter{0}; namespace zen { using namespace std::literals; namespace detail { static std::atomic_uint32_t TempFileBaseIndex; class TempPayloadFile { public: TempPayloadFile(const TempPayloadFile&) = delete; TempPayloadFile& operator=(const TempPayloadFile&) = delete; TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} ~TempPayloadFile() { ZEN_TRACE_CPU("TempPayloadFile::Close"); try { if (m_FileHandle) { #if ZEN_PLATFORM_WINDOWS // Mark file for deletion when final handle is closed FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE}; SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); BOOL Success = CloseHandle(m_FileHandle); #else std::error_code Ec; std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle, Ec); if (Ec) { ZEN_WARN("Error reported on get file path from handle {} for temp payload unlink operation, reason '{}'", m_FileHandle, Ec.message()); } else { unlink(FilePath.c_str()); } int Fd = int(uintptr_t(m_FileHandle)); bool Success = (close(Fd) == 0); #endif if (!Success) { ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString()); } m_FileHandle = nullptr; } } catch (const std::exception& Ex) { ZEN_ERROR("Failed deleting temp file {}. Reason '{}'", m_FileHandle, Ex.what()); } } std::error_code Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize) { ZEN_TRACE_CPU("TempPayloadFile::Open"); ZEN_ASSERT(m_FileHandle == nullptr); std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) | detail::TempFileBaseIndex.fetch_add(1); std::filesystem::path FileName = TempFolderPath / fmt::to_string(TmpIndex); #if ZEN_PLATFORM_WINDOWS LPCWSTR lpFileName = FileName.c_str(); const DWORD dwDesiredAccess = (GENERIC_READ | GENERIC_WRITE | DELETE); const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; LPSECURITY_ATTRIBUTES lpSecurityAttributes = nullptr; const DWORD dwCreationDisposition = CREATE_ALWAYS; const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; const HANDLE hTemplateFile = nullptr; const HANDLE FileHandle = CreateFile(lpFileName, dwDesiredAccess, dwShareMode, lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile); if (FileHandle == INVALID_HANDLE_VALUE) { return MakeErrorCodeFromLastError(); } #else // ZEN_PLATFORM_WINDOWS int OpenFlags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC; int Fd = open(FileName.c_str(), OpenFlags, 0666); if (Fd < 0) { return MakeErrorCodeFromLastError(); } fchmod(Fd, 0666); void* FileHandle = (void*)(uintptr_t(Fd)); #endif // ZEN_PLATFORM_WINDOWS m_FileHandle = FileHandle; PrepareFileForScatteredWrite(m_FileHandle, FinalSize); return {}; } std::error_code Write(std::string_view DataString) { ZEN_TRACE_CPU("TempPayloadFile::Write"); const uint8_t* DataPtr = (const uint8_t*)DataString.data(); size_t DataSize = DataString.size(); if (DataSize >= CacheBufferSize) { std::error_code Ec = Flush(); if (Ec) { return Ec; } return AppendData(DataPtr, DataSize); } size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset); memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize); m_CacheBufferOffset += CopySize; DataSize -= CopySize; if (m_CacheBufferOffset == CacheBufferSize) { AppendData(m_CacheBuffer, CacheBufferSize); if (DataSize > 0) { ZEN_ASSERT(DataSize < CacheBufferSize); memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize); } m_CacheBufferOffset = DataSize; } else { ZEN_ASSERT(DataSize == 0); } return {}; } IoBuffer DetachToIoBuffer() { ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); } ZEN_ASSERT(m_FileHandle != nullptr); void* FileHandle = m_FileHandle; IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); Buffer.SetDeleteOnClose(true); m_FileHandle = 0; m_WriteOffset = 0; return Buffer; } IoBuffer BorrowIoBuffer() { ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); } ZEN_ASSERT(m_FileHandle != nullptr); void* FileHandle = m_FileHandle; IoBuffer Buffer(IoBuffer::BorrowedFile, FileHandle, 0, m_WriteOffset); return Buffer; } uint64_t GetSize() const { return m_WriteOffset; } void ResetWritePos(uint64_t WriteOffset) { ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos"); Flush(); m_WriteOffset = WriteOffset; } private: std::error_code Flush() { ZEN_TRACE_CPU("TempPayloadFile::Flush"); if (m_CacheBufferOffset == 0) { return {}; } std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset); m_CacheBufferOffset = 0; return Res; } std::error_code AppendData(const void* Data, uint64_t Size) { ZEN_TRACE_CPU("TempPayloadFile::AppendData"); ZEN_ASSERT(m_FileHandle != nullptr); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; while (Size) { const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize); uint64_t NumberOfBytesWritten = 0; #if ZEN_PLATFORM_WINDOWS OVERLAPPED Ovl{}; Ovl.Offset = DWORD(m_WriteOffset & 0xffff'ffffu); Ovl.OffsetHigh = DWORD(m_WriteOffset >> 32); DWORD dwNumberOfBytesWritten = 0; BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); if (Success) { NumberOfBytesWritten = static_cast(dwNumberOfBytesWritten); } #else static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); int Fd = int(uintptr_t(m_FileHandle)); int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, m_WriteOffset); bool Success = (BytesWritten > 0); if (Success) { NumberOfBytesWritten = static_cast(BytesWritten); } #endif if (!Success) { return MakeErrorCodeFromLastError(); } Size -= NumberOfBytesWritten; m_WriteOffset += NumberOfBytesWritten; Data = reinterpret_cast(Data) + NumberOfBytesWritten; } return {}; } void* m_FileHandle; std::uint64_t m_WriteOffset; static constexpr uint64_t CacheBufferSize = 512u * 1024u; uint8_t m_CacheBuffer[CacheBufferSize]; std::uint64_t m_CacheBufferOffset = 0; }; class BufferedReadFileStream { public: BufferedReadFileStream(const BufferedReadFileStream&) = delete; BufferedReadFileStream& operator=(const BufferedReadFileStream&) = delete; BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize) : m_FileHandle(FileHandle) , m_FileSize(FileSize) , m_FileEnd(FileOffset + FileSize) , m_BufferSize(Min(BufferSize, FileSize)) , m_FileOffset(FileOffset) { } ~BufferedReadFileStream() { Memory::Free(m_Buffer); } void Read(void* Data, uint64_t Size) { ZEN_ASSERT(Data != nullptr); if (Size > m_BufferSize) { Read(Data, Size, m_FileOffset); m_FileOffset += Size; return; } uint8_t* WritePtr = ((uint8_t*)Data); uint64_t Begin = m_FileOffset; uint64_t End = m_FileOffset + Size; ZEN_ASSERT(m_FileOffset >= m_BufferStart); if (m_FileOffset < m_BufferEnd) { ZEN_ASSERT(m_Buffer != nullptr); uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset; memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count); Begin += Count; if (Begin == End) { m_FileOffset = End; return; } } if (End == m_FileEnd) { Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin); } else { if (!m_Buffer) { m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize); m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow(m_BufferSize)); } m_BufferStart = Begin; m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd); Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count); ZEN_ASSERT(Begin + Count == End); } m_FileOffset = End; } private: void Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) { const uint64_t MaxChunkSize = 1u * 1024 * 1024; std::error_code Ec; ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec); if (Ec) { std::error_code DummyEc; throw std::system_error( Ec, fmt::format( "HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", FileOffset, BytesToRead, PathFromHandle(m_FileHandle, DummyEc).generic_string(), m_FileSize)); } } void* m_FileHandle = nullptr; const uint64_t m_FileSize = 0; const uint64_t m_FileEnd = 0; uint64_t m_BufferSize = 0; uint8_t* m_Buffer = nullptr; uint64_t m_BufferStart = 0; uint64_t m_BufferEnd = 0; uint64_t m_FileOffset = 0; }; class CompositeBufferReadStream { public: CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize) : m_Data(Data) , m_BufferSize(BufferSize) , m_SegmentIndex(0) , m_BytesLeftInSegment(0) { } uint64_t Read(void* Data, uint64_t Size) { uint64_t Result = 0; uint8_t* WritePtr = (uint8_t*)Data; while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size())) { if (m_BytesLeftInSegment == 0) { const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex]; IoBufferFileReference FileRef = {nullptr, 0, 0}; if (Segment.AsIoBuffer().GetFileReference(FileRef)) { m_SegmentDiskBuffer = std::make_unique(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, m_BufferSize); } else { m_SegmentMemoryBuffer = Segment.GetView(); } m_BytesLeftInSegment = Segment.GetSize(); } uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size); if (m_SegmentDiskBuffer) { m_SegmentDiskBuffer->Read(WritePtr, BytesToRead); } else { ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead); memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead); m_SegmentMemoryBuffer.MidInline(BytesToRead); } WritePtr += BytesToRead; Size -= BytesToRead; Result += BytesToRead; m_BytesLeftInSegment -= BytesToRead; if (m_BytesLeftInSegment == 0) { m_SegmentDiskBuffer.reset(); m_SegmentMemoryBuffer.Reset(); m_SegmentIndex++; } } return Result; } private: const CompositeBuffer& m_Data; const uint64_t m_BufferSize; size_t m_SegmentIndex; std::unique_ptr m_SegmentDiskBuffer; MemoryView m_SegmentMemoryBuffer; uint64_t m_BytesLeftInSegment; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// // // CPR helpers static cpr::Body AsCprBody(const CbObject& Obj) { return cpr::Body((const char*)Obj.GetBuffer().GetData(), Obj.GetBuffer().GetSize()); } static cpr::Body AsCprBody(const IoBuffer& Obj) { return cpr::Body((const char*)Obj.GetData(), Obj.GetSize()); } ////////////////////////////////////////////////////////////////////////// static HttpClient::Response ResponseWithPayload(std::string_view SessionId, cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results // into buffer directly IoBuffer ResponseBuffer = Payload ? std::move(Payload) : IoBuffer(IoBuffer::Clone, HttpResponse.text.data(), HttpResponse.text.size()); if (auto It = HttpResponse.header.find("Content-Type"); It != HttpResponse.header.end()) { const HttpContentType ContentType = ParseContentType(It->second); ResponseBuffer.SetContentType(ContentType); } if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound) { ZEN_WARN("HttpClient request failed (session: {}): {}", SessionId, HttpResponse); } return HttpClient::Response{.StatusCode = WorkResponseCode, .ResponsePayload = std::move(ResponseBuffer), .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()), .UploadedBytes = gsl::narrow(HttpResponse.uploaded_bytes), .DownloadedBytes = gsl::narrow(HttpResponse.downloaded_bytes), .ElapsedSeconds = HttpResponse.elapsed}; } static HttpClient::Response CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); if (HttpResponse.error) { if (HttpResponse.error.code != cpr::ErrorCode::OPERATION_TIMEDOUT && HttpResponse.error.code != cpr::ErrorCode::CONNECTION_FAILURE && HttpResponse.error.code != cpr::ErrorCode::REQUEST_CANCELLED) { ZEN_WARN("HttpClient client failure (session: {}): {}", SessionId, HttpResponse); } // Client side failure code return HttpClient::Response{ .StatusCode = WorkResponseCode, .ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size()), .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()), .UploadedBytes = gsl::narrow(HttpResponse.uploaded_bytes), .DownloadedBytes = gsl::narrow(HttpResponse.downloaded_bytes), .ElapsedSeconds = HttpResponse.elapsed, .Error = HttpClient::ErrorContext{.ErrorCode = gsl::narrow(HttpResponse.error.code), .ErrorMessage = HttpResponse.error.message}}; } if (WorkResponseCode == HttpResponseCode::NoContent || (HttpResponse.text.empty() && !Payload)) { return HttpClient::Response{.StatusCode = WorkResponseCode, .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()), .UploadedBytes = gsl::narrow(HttpResponse.uploaded_bytes), .DownloadedBytes = gsl::narrow(HttpResponse.downloaded_bytes), .ElapsedSeconds = HttpResponse.elapsed}; } else { return ResponseWithPayload( SessionId, HttpResponse, WorkResponseCode, Payload ? std::move(Payload) : IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size())); } } static bool ShouldRetry(const cpr::Response& Response) { switch (Response.error.code) { case cpr::ErrorCode::OK: break; case cpr::ErrorCode::INTERNAL_ERROR: case cpr::ErrorCode::NETWORK_RECEIVE_ERROR: case cpr::ErrorCode::NETWORK_SEND_FAILURE: case cpr::ErrorCode::OPERATION_TIMEDOUT: return true; default: return false; } switch ((HttpResponseCode)Response.status_code) { case HttpResponseCode::RequestTimeout: case HttpResponseCode::TooManyRequests: case HttpResponseCode::InternalServerError: case HttpResponseCode::BadGateway: case HttpResponseCode::ServiceUnavailable: case HttpResponseCode::GatewayTimeout: return true; default: return false; } }; static bool ValidatePayload(cpr::Response& Response, std::unique_ptr& PayloadFile) { ZEN_TRACE_CPU("ValidatePayload"); IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer() : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()); if (auto ContentLength = Response.header.find("Content-Length"); ContentLength != Response.header.end()) { std::optional ExpectedContentSize = ParseInt(ContentLength->second); if (!ExpectedContentSize.has_value()) { Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, fmt::format("Can not parse Content-Length header. Value: '{}'", ContentLength->second)); return false; } if (ExpectedContentSize.value() != ResponseBuffer.GetSize()) { Response.error = cpr::Error( /*CURLE_READ_ERROR*/ 26, fmt::format("Payload size {} does not match Content-Length {}", ResponseBuffer.GetSize(), ContentLength->second)); return false; } } if (Response.status_code == (long)HttpResponseCode::PartialContent) { return true; } if (auto JupiterHash = Response.header.find("X-Jupiter-IoHash"); JupiterHash != Response.header.end()) { IoHash ExpectedPayloadHash; if (IoHash::TryParse(JupiterHash->second, ExpectedPayloadHash)) { IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer); if (PayloadHash != ExpectedPayloadHash) { Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, fmt::format("Payload hash {} does not match X-Jupiter-IoHash {}", PayloadHash.ToHexString(), ExpectedPayloadHash.ToHexString())); return false; } } } if (auto ContentType = Response.header.find("Content-Type"); ContentType != Response.header.end()) { if (ContentType->second == "application/x-ue-comp") { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize)) { return true; } else { Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation"); return false; } } if (ContentType->second == "application/x-ue-cb") { if (CbValidateError Error = ValidateCompactBinary(ResponseBuffer.GetView(), CbValidateMode::Default); Error == CbValidateError::None) { return true; } else { Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, fmt::format("Compact binary failed validation: {}", ToString(Error))); return false; } } } return true; } static cpr::Response DoWithRetry( std::string_view SessionId, std::function&& Func, uint8_t RetryCount, std::function&& Validate = [](cpr::Response&) { return true; }) { uint8_t Attempt = 0; cpr::Response Result = Func(); while (Attempt < RetryCount) { if (!ShouldRetry(Result)) { if (Result.error || !IsHttpSuccessCode(Result.status_code)) { break; } if (Validate(Result)) { break; } } Sleep(100 * (Attempt + 1)); Attempt++; ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); Result = Func(); } return Result; } static cpr::Response DoWithRetry(std::string_view SessionId, std::function&& Func, std::unique_ptr& PayloadFile, uint8_t RetryCount) { uint8_t Attempt = 0; cpr::Response Result = Func(); while (Attempt < RetryCount) { if (!ShouldRetry(Result)) { if (Result.error || !IsHttpSuccessCode(Result.status_code)) { break; } if (ValidatePayload(Result, PayloadFile)) { break; } } Sleep(100 * (Attempt + 1)); Attempt++; ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); Result = Func(); } return Result; } static std::pair HeaderContentType(ZenContentType ContentType) { return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType))); } ////////////////////////////////////////////////////////////////////////// struct HttpClient::Impl : public RefCounted { Impl(LoggerRef Log); ~Impl(); // Session allocation struct Session { Session(Impl* InOuter, cpr::Session* InSession) : Outer(InOuter), CprSession(InSession) {} ~Session() { Outer->ReleaseSession(CprSession); } inline cpr::Session* operator->() const { return CprSession; } inline cpr::Response Get() { ZEN_TRACE_CPU("HttpClient::Impl::Get"); cpr::Response Result = CprSession->Get(); ZEN_TRACE("GET {}", Result); return Result; } inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional&& Header = {}) { ZEN_TRACE_CPU("HttpClient::Impl::Download"); if (Header) { CprSession->SetHeaderCallback(std::move(Header.value())); } cpr::Response Result = CprSession->Download(Write); ZEN_TRACE("GET {}", Result); CprSession->SetHeaderCallback({}); CprSession->SetWriteCallback({}); return Result; } inline cpr::Response Head() { ZEN_TRACE_CPU("HttpClient::Impl::Head"); cpr::Response Result = CprSession->Head(); ZEN_TRACE("HEAD {}", Result); return Result; } inline cpr::Response Put(std::optional&& Read = {}) { ZEN_TRACE_CPU("HttpClient::Impl::Put"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); } cpr::Response Result = CprSession->Put(); ZEN_TRACE("PUT {}", Result); CprSession->SetReadCallback({}); return Result; } inline cpr::Response Post(std::optional&& Read = {}) { ZEN_TRACE_CPU("HttpClient::Impl::Post"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); } cpr::Response Result = CprSession->Post(); ZEN_TRACE("POST {}", Result); CprSession->SetReadCallback({}); return Result; } inline cpr::Response Delete() { ZEN_TRACE_CPU("HttpClient::Impl::Delete"); cpr::Response Result = CprSession->Delete(); ZEN_TRACE("DELETE {}", Result); return Result; } LoggerRef Logger() { return Outer->Logger(); } private: Impl* Outer; cpr::Session* CprSession; Session(Session&&) = delete; Session& operator=(Session&&) = delete; }; Session AllocSession(const std::string_view BaseUrl, const std::string_view Url, const HttpClientSettings& ConnectionSettings, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters, const std::string_view SessionId, std::optional AccessToken); LoggerRef Logger() { return m_Log; } private: LoggerRef m_Log; RwLock m_SessionLock; std::vector m_Sessions; void ReleaseSession(cpr::Session*); }; HttpClient::Impl::Impl(LoggerRef Log) : m_Log(Log) { } HttpClient::Impl::~Impl() { ZEN_TRACE_CPU("HttpClient::Impl::~Impl"); m_SessionLock.WithExclusiveLock([&] { for (auto CprSession : m_Sessions) { delete CprSession; } m_Sessions.clear(); }); } HttpClient::Impl::Session HttpClient::Impl::AllocSession(const std::string_view BaseUrl, const std::string_view ResourcePath, const HttpClientSettings& ConnectionSettings, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters, const std::string_view SessionId, std::optional AccessToken) { ZEN_TRACE_CPU("HttpClient::Impl::AllocSession"); cpr::Session* CprSession = nullptr; m_SessionLock.WithExclusiveLock([&] { if (!m_Sessions.empty()) { CprSession = m_Sessions.back(); m_Sessions.pop_back(); } }); if (CprSession == nullptr) { CprSession = new cpr::Session(); CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout); CprSession->SetTimeout(ConnectionSettings.Timeout); if (ConnectionSettings.AssumeHttp2) { CprSession->SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE}); } } if (!AdditionalHeader->empty()) { CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end())); } if (!SessionId.empty()) { CprSession->UpdateHeader({{"UE-Session", std::string(SessionId)}}); } if (AccessToken) { CprSession->UpdateHeader({{"Authorization", AccessToken->Value}}); } if (!Parameters->empty()) { cpr::Parameters Tmp; for (auto It = Parameters->begin(); It != Parameters->end(); It++) { Tmp.Add({It->first, It->second}); } CprSession->SetParameters(Tmp); } else { CprSession->SetParameters({}); } ExtendableStringBuilder<128> UrlBuffer; UrlBuffer << BaseUrl << ResourcePath; CprSession->SetUrl(UrlBuffer.c_str()); return Session(this, CprSession); } void HttpClient::Impl::ReleaseSession(cpr::Session* CprSession) { ZEN_TRACE_CPU("HttpClient::Impl::ReleaseSession"); CprSession->SetUrl({}); CprSession->SetHeader({}); CprSession->SetBody({}); m_SessionLock.WithExclusiveLock([&] { m_Sessions.push_back(CprSession); }); } ////////////////////////////////////////////////////////////////////////// HttpClient::HttpClient(std::string_view BaseUri, const HttpClientSettings& Connectionsettings) : m_Log(zen::logging::Get(Connectionsettings.LogCategory)) , m_BaseUri(BaseUri) , m_ConnectionSettings(Connectionsettings) , m_Impl(new Impl(m_Log)) { m_SessionId = GetSessionIdString(); } HttpClient::~HttpClient() { } bool HttpClient::Authenticate() { ZEN_TRACE_CPU("HttpClient::Authenticate"); std::optional Token = GetAccessToken(); if (!Token) { return false; } return Token->IsValid(); } const std::optional HttpClient::GetAccessToken() { ZEN_TRACE_CPU("HttpClient::GetAccessToken"); if (!m_ConnectionSettings.AccessTokenProvider.has_value()) { return {}; } { RwLock::SharedLockScope _(m_AccessTokenLock); if (m_CachedAccessToken.IsValid()) { return m_CachedAccessToken; } } RwLock::ExclusiveLockScope _(m_AccessTokenLock); if (m_CachedAccessToken.IsValid()) { return m_CachedAccessToken; } m_CachedAccessToken = m_ConnectionSettings.AccessTokenProvider.value()(); return m_CachedAccessToken; } HttpClient::Response HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::TransactPackage"); Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); // First, list of offered chunks for filtering on the server end std::vector AttachmentsToSend; std::span Attachments = Package.GetAttachments(); const uint32_t RequestId = ++HttpClientRequestIdCounter; auto RequestIdString = fmt::to_string(RequestId); if (Attachments.empty() == false) { CbObjectWriter Writer; Writer.BeginArray("offer"); for (const CbAttachment& Attachment : Attachments) { Writer.AddHash(Attachment.GetHash()); } Writer.EndArray(); BinaryWriter MemWriter; Writer.Save(MemWriter); Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackageOffer), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()}); cpr::Response FilterResponse = Sess.Post(); if (FilterResponse.status_code == 200) { IoBuffer ResponseBuffer(IoBuffer::Wrap, FilterResponse.text.data(), FilterResponse.text.size()); CbObject ResponseObject = LoadCompactBinaryObject(ResponseBuffer); for (CbFieldView& Entry : ResponseObject["need"]) { ZEN_ASSERT(Entry.IsHash()); AttachmentsToSend.push_back(Entry.AsHash()); } } } // Prepare package for send CbPackage SendPackage; SendPackage.SetObject(Package.GetObject(), Package.GetObjectHash()); for (const IoHash& AttachmentCid : AttachmentsToSend) { const CbAttachment* Attachment = Package.FindAttachment(AttachmentCid); if (Attachment) { SendPackage.AddAttachment(*Attachment); } else { // This should be an error -- server asked to have something we can't find } } // Transmit package payload CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage); SharedBuffer FlatMessage = Message.Flatten(); Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackage), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()}); cpr::Response FilterResponse = Sess.Post(); if (!IsHttpSuccessCode(FilterResponse.status_code)) { return {.StatusCode = HttpResponseCode(FilterResponse.status_code)}; } IoBuffer ResponseBuffer(IoBuffer::Clone, FilterResponse.text.data(), FilterResponse.text.size()); if (auto It = FilterResponse.header.find("Content-Type"); It != FilterResponse.header.end()) { HttpContentType ContentType = ParseContentType(It->second); ResponseBuffer.SetContentType(ContentType); } return {.StatusCode = HttpResponseCode(FilterResponse.status_code), .ResponsePayload = ResponseBuffer}; } ////////////////////////////////////////////////////////////////////////// // // Standard HTTP verbs // HttpClient::Response HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Put"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->SetBody(AsCprBody(Payload)); Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); return Sess.Put(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Put"); return CommonResponse(m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, {{"Content-Length", "0"}}, Parameters, m_SessionId, GetAccessToken()); return Sess.Put(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); return Sess.Get(); }, m_ConnectionSettings.RetryCount, [](cpr::Response& Result) { std::unique_ptr NoTempFile; return ValidatePayload(Result, NoTempFile); })); } HttpClient::Response HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Head"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); return Sess.Head(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); return Sess.Delete(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); return Sess.Post(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { return Post(Url, Payload, Payload.GetContentType(), AdditionalHeader); } HttpClient::Response HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(ContentType)}); IoBufferFileReference FileRef = {nullptr, 0, 0}; if (Payload.GetFileReference(FileRef)) { uint64_t Offset = 0; detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { size = Min(size, Payload.GetSize() - Offset); Buffer.Read(buffer, size); Offset += size; return true; }; return Sess.Post(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); } Sess->SetBody(AsCprBody(Payload)); return Sess.Post(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->SetBody(AsCprBody(Payload)); Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); return Sess.Post(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& AdditionalHeader) { return Post(Url, zen::FormatPackageMessageBuffer(Pkg), ZenContentType::kCbPackage, AdditionalHeader); } HttpClient::Response HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Post"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(ContentType)}); detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { size = Reader.Read(buffer, size); return true; }; return Sess.Post(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Upload"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); IoBufferFileReference FileRef = {nullptr, 0, 0}; if (Payload.GetFileReference(FileRef)) { uint64_t Offset = 0; detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { size = Min(size, Payload.GetSize() - Offset); Buffer.Read(buffer, size); Offset += size; return true; }; return Sess.Put(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); } Sess->SetBody(AsCprBody(Payload)); return Sess.Put(); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Upload"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(ContentType)}); detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { size = Reader.Read(buffer, size); return true; }; return Sess.Put(cpr::ReadCallback(gsl::narrow(Payload.GetSize()), ReadCallback)); }, m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFolderPath, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Download"); std::string PayloadString; std::unique_ptr PayloadFile; cpr::Response Response = DoWithRetry( m_SessionId, [&]() { auto GetHeader = [&](std::string header) -> std::pair { size_t DelimiterPos = header.find(':'); if (DelimiterPos != std::string::npos) { std::string Key = header.substr(0, DelimiterPos); constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); std::string Value = header.substr(DelimiterPos + 1); Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); return std::make_pair(Key, Value); } return std::make_pair(header, ""); }; auto DownloadCallback = [&](std::string data, intptr_t) { if (PayloadFile) { ZEN_ASSERT(PayloadString.empty()); std::error_code Ec = PayloadFile->Write(data); if (Ec) { ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message()); return false; } } else { PayloadString.append(data); } return true; }; uint64_t RequestedContentLength = (uint64_t)-1; if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end()) { if (RangeIt->second.starts_with("bytes")) { size_t RangeStartPos = RangeIt->second.find('=', 5); if (RangeStartPos != std::string::npos) { RangeStartPos++; size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos); if (RangeSplitPos != std::string::npos) { std::optional RequestedRangeStart = ParseInt(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos)); std::optional RequestedRangeEnd = ParseInt(RangeIt->second.substr(RangeStartPos + 1)); if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value()) { RequestedContentLength = RequestedRangeEnd.value() - 1; } } } } } cpr::Response Response; { std::vector> ReceivedHeaders; auto HeaderCallback = [&](std::string header, intptr_t) { std::pair Header = GetHeader(header); if (Header.first == "Content-Length"sv) { std::optional ContentLength = ParseInt(Header.second); if (ContentLength.has_value()) { if (ContentLength.value() > 1024 * 1024) { PayloadFile = std::make_unique(); std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value()); if (Ec) { ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message()); PayloadFile.reset(); } } else { PayloadString.reserve(ContentLength.value()); } } } if (!Header.first.empty()) { ReceivedHeaders.emplace_back(std::move(Header)); } return 1; }; Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); for (const std::pair& H : ReceivedHeaders) { Response.header.insert_or_assign(H.first, H.second); } } if (m_ConnectionSettings.AllowResume) { auto SupportsRanges = [](const cpr::Response& Response) -> bool { if (Response.header.find("Content-Range") != Response.header.end()) { return true; } if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end()) { return It->second == "bytes"sv; } return false; }; auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool { if (ShouldRetry(Response)) { return SupportsRanges(Response); } return false; }; if (ShouldResume(Response)) { auto It = Response.header.find("Content-Length"); if (It != Response.header.end()) { std::vector> ReceivedHeaders; auto HeaderCallback = [&](std::string header, intptr_t) { std::pair Header = GetHeader(header); if (!Header.first.empty()) { ReceivedHeaders.emplace_back(std::move(Header)); } if (Header.first == "Content-Range"sv) { if (Header.second.starts_with("bytes "sv)) { size_t RangeStartEnd = Header.second.find('-', 6); if (RangeStartEnd != std::string::npos) { const auto Start = ParseInt(Header.second.substr(6, RangeStartEnd - 6)); if (Start) { uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); if (Start.value() == DownloadedSize) { return 1; } else if (Start.value() > DownloadedSize) { return 0; } if (PayloadFile) { PayloadFile->ResetWritePos(Start.value()); } else { PayloadString = PayloadString.substr(0, Start.value()); } return 1; } } } return 0; } return 1; }; KeyValueMap HeadersWithRange(AdditionalHeader); do { uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); uint64_t ContentLength = RequestedContentLength; if (ContentLength == uint64_t(-1)) { if (auto ParsedContentLength = ParseInt(It->second); ParsedContentLength.has_value()) { ContentLength = ParsedContentLength.value(); } } std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1); if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) { if (RangeIt->second == Range) { // If we didn't make any progress, abort break; } } HeadersWithRange.Entries.insert_or_assign("Range", Range); Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, HeadersWithRange, {}, m_SessionId, GetAccessToken()); Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); for (const std::pair& H : ReceivedHeaders) { Response.header.insert_or_assign(H.first, H.second); } ReceivedHeaders.clear(); } while (ShouldResume(Response)); } } } if (!PayloadString.empty()) { Response.text = std::move(PayloadString); } return Response; }, PayloadFile, m_ConnectionSettings.RetryCount); return CommonResponse(m_SessionId, std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } ////////////////////////////////////////////////////////////////////////// CbObject HttpClient::Response::AsObject() const { // TODO: sanity check the payload format etc if (ResponsePayload) { return LoadCompactBinaryObject(ResponsePayload); } return {}; } CbPackage HttpClient::Response::AsPackage() const { // TODO: sanity checks and error handling if (ResponsePayload) { return ParsePackageMessage(ResponsePayload); } return {}; } std::string_view HttpClient::Response::AsText() const { if (ResponsePayload) { return std::string_view(reinterpret_cast(ResponsePayload.GetData()), ResponsePayload.GetSize()); } return {}; } std::string HttpClient::Response::ToText() const { if (!ResponsePayload) return {}; switch (ResponsePayload.GetContentType()) { case ZenContentType::kCbObject: { zen::ExtendableStringBuilder<1024> ObjStr; zen::CbObject Object{SharedBuffer(ResponsePayload)}; zen::CompactBinaryToJson(Object, ObjStr); return ObjStr.ToString(); } break; case ZenContentType::kCSS: case ZenContentType::kHTML: case ZenContentType::kJavaScript: case ZenContentType::kJSON: case ZenContentType::kText: case ZenContentType::kYAML: return std::string{AsText()}; default: return ""; } } bool HttpClient::Response::IsSuccess() const noexcept { return !Error && IsHttpSuccessCode(StatusCode); } std::string HttpClient::Response::ErrorMessage(std::string_view Prefix) const { if (Error.has_value()) { return fmt::format("{}: {}", Prefix, Error->ErrorMessage); } else if (StatusCode != HttpResponseCode::ImATeapot && (int)StatusCode) { std::string TextResponse = ToText(); return fmt::format("{}{}HTTP error {} {}{}", Prefix, Prefix.empty() ? ""sv : ": "sv, (int)StatusCode, zen::ToString(StatusCode), TextResponse.empty() ? ""sv : fmt::format(" ({})", TextResponse)); } else { return fmt::format("{}{}unknown error", Prefix, Prefix.empty() ? ""sv : ": "sv); } } void HttpClient::Response::ThrowError(std::string_view ErrorPrefix) { if (!IsSuccess()) { throw HttpClientError(ErrorMessage(ErrorPrefix), Error.has_value() ? Error.value().ErrorCode : 0, StatusCode); } } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS namespace testutil { IoHash HashComposite(const CompositeBuffer& Payload) { IoHashStream Hasher; const uint64_t PayloadSize = Payload.GetSize(); std::vector Buffer(64u * 1024u); detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u); for (uint64_t Offset = 0; Offset < PayloadSize;) { uint64_t Count = Min(64u * 1024u, PayloadSize - Offset); Stream.Read(Buffer.data(), Count); Hasher.Append(Buffer.data(), Count); Offset += Count; } return Hasher.GetHash(); }; IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize) { IoHashStream Hasher; std::vector Buffer(64u * 1024u); detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u); for (uint64_t Offset = 0; Offset < FileSize;) { uint64_t Count = Min(64u * 1024u, FileSize - Offset); Stream.Read(Buffer.data(), Count); Hasher.Append(Buffer.data(), Count); Offset += Count; } return Hasher.GetHash(); } } // namespace testutil TEST_CASE("responseformat") { using namespace std::literals; SUBCASE("identity") { BodyLogFormatter _{"abcd"}; CHECK_EQ(_.GetText(), "abcd"sv); } SUBCASE("very long") { std::string_view LongView = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; BodyLogFormatter _{LongView}; CHECK(_.GetText().size() < LongView.size()); CHECK(_.GetText().starts_with("[truncated"sv)); } SUBCASE("invalid text") { std::string_view BadText = "totobaba\xff\xfe"; BodyLogFormatter _{BadText}; CHECK_EQ(_.GetText(), "totobaba"); } } TEST_CASE("BufferedReadFileStream") { ScopedTemporaryDirectory TmpDir; IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1"); IoBufferFileReference FileRef = {nullptr, 0, 0}; CHECK(DiskBuffer.GetFileReference(FileRef)); CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024); CHECK(Partial.GetFileReference(FileRef)); CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2"); CHECK(SmallDiskBuffer.GetFileReference(FileRef)); CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); } TEST_CASE("CompositeBufferReadStream") { ScopedTemporaryDirectory TmpDir; IoBuffer MemoryBuffer1 = CreateRandomBlob(64); CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1)))); IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024); CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2)))); IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1"); CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1)))); IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2"); CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2)))); IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3"); CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3)))); CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)), SharedBuffer(std::move(DiskBuffer1)), SharedBuffer(std::move(DiskBuffer2)), SharedBuffer(std::move(MemoryBuffer2)), SharedBuffer(std::move(DiskBuffer3))); CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data)); } TEST_CASE("httpclient") { using namespace std::literals; SUBCASE("client") {} } void httpclient_forcelink() { } #endif } // namespace zen