diff options
| author | EPICGAMES\thierry.begin <[email protected]> | 2024-04-08 10:43:16 -0400 |
|---|---|---|
| committer | EPICGAMES\thierry.begin <[email protected]> | 2024-04-08 10:43:16 -0400 |
| commit | b35e1258a043cab06950b2453f434861d99b918a (patch) | |
| tree | 695737774fa08ebaa0e32a9f95cb0247c34b3dc3 /src | |
| parent | Add docker support (diff) | |
| parent | Merge pull request #41 from ue-foundation/zs/import-oplog-clean (diff) | |
| download | zen-tb/docker.tar.xz zen-tb/docker.zip | |
Merge branch 'main' of https://github.ol.epicgames.net/ue-foundation/zen into tb/dockertb/docker
Diffstat (limited to 'src')
55 files changed, 917 insertions, 492 deletions
diff --git a/src/transports/winsock/winsock.cpp b/src/transports/winsock/winsock.cpp index 7ee2b5ed1..1c3ee909a 100644 --- a/src/transports/winsock/winsock.cpp +++ b/src/transports/winsock/winsock.cpp @@ -317,7 +317,7 @@ WinsockTransportPlugin::Initialize(TransportServer* ServerInterface) { Connection->HandleConnection(); } - catch (std::exception&) + catch (const std::exception&) { // ZEN_WARN("exception caught in connection loop: {}", Ex.what()); } diff --git a/src/zen/cmds/bench_cmd.cpp b/src/zen/cmds/bench_cmd.cpp index 5c955e980..86b82d838 100644 --- a/src/zen/cmds/bench_cmd.cpp +++ b/src/zen/cmds/bench_cmd.cpp @@ -48,11 +48,11 @@ BenchCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) Ok = true; } - catch (zen::bench::util::elevation_required_exception&) + catch (const zen::bench::util::elevation_required_exception&) { ZEN_CONSOLE("purging standby lists requires elevation. Will try launch as elevated process"); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_CONSOLE("ERROR: {}", Ex.what()); } @@ -83,7 +83,7 @@ BenchCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_CONSOLE("ERROR: {}", Ex.what()); } diff --git a/src/zen/cmds/copy_cmd.cpp b/src/zen/cmds/copy_cmd.cpp index 956d9c9d2..f39bfa71c 100644 --- a/src/zen/cmds/copy_cmd.cpp +++ b/src/zen/cmds/copy_cmd.cpp @@ -148,7 +148,7 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::logic_error("CopyFile failed in an unexpected way"); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ++FailedFileCount; @@ -211,7 +211,7 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::logic_error("CopyFile failed in an unexpected way"); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_CONSOLE_ERROR("Error: failed to copy '{}' to '{}': '{}'", FromPath, ToPath, Ex.what()); diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 40ba48137..f877a3c51 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -1007,6 +1007,12 @@ ImportOplogCommand::ImportOplogCommand() m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); + m_Options.add_option("", + "", + "gcpath", + "Absolute path to oplog lifetime marker file if we create the oplog", + cxxopts::value(m_GcPath), + "<path>"); m_Options.add_option("", "", "maxblocksize", "Max size for bundled attachments", cxxopts::value(m_MaxBlockSize), "<blocksize>"); m_Options.add_option("", "", @@ -1016,6 +1022,7 @@ ImportOplogCommand::ImportOplogCommand() "<chunksize>"); m_Options.add_option("", "f", "force", "Force import of all attachments", cxxopts::value(m_Force), "<force>"); m_Options.add_option("", "a", "async", "Trigger import but don't wait for completion", cxxopts::value(m_Async), "<async>"); + m_Options.add_option("", "", "clean", "Delete existing target oplog", cxxopts::value(m_Clean), "<clean>"); m_Options.add_option("", "", "ignore-missing-attachments", @@ -1052,12 +1059,12 @@ ImportOplogCommand::ImportOplogCommand() m_Options.add_option("", "", "zen", "Zen service upload address", cxxopts::value(m_ZenUrl), "<url>"); m_Options.add_option("zen", "", "source-project", "Zen source project name", cxxopts::value(m_ZenProjectName), "<sourceprojectid>"); m_Options.add_option("zen", "", "source-oplog", "Zen source oplog name", cxxopts::value(m_ZenOplogName), "<sourceoplogid>"); - m_Options.add_option("zen", "", "clean", "Delete existing target Zen oplog", cxxopts::value(m_ZenClean), "<clean>"); m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>"); m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>"); - m_Options.parse_positional({"project", "oplog"}); + m_Options.parse_positional({"project", "oplog", "gcpath"}); + m_Options.positional_help("[<projectid> <oplogid> [<gcpath>]]"); } ImportOplogCommand::~ImportOplogCommand() @@ -1153,7 +1160,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg bool CreateOplog = false; if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) { - if (m_ZenClean) + if (m_Clean) { ZEN_WARN("Deleting oplog '{}/{}'", m_ProjectName, m_OplogName) Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON)); @@ -1177,8 +1184,13 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg if (CreateOplog) { + IoBuffer OplogPayload; + if (!m_GcPath.empty()) + { + OplogPayload = MakeCbObjectPayload([&](CbObjectWriter& Writer) { Writer.AddString("gcpath"sv, m_GcPath); }); + } ZEN_WARN("Creating oplog '{}/{}'", m_ProjectName, m_OplogName); - if (HttpClient::Response Result = Http.Post(Url); !Result) + if (HttpClient::Response Result = Http.Post(Url, OplogPayload); !Result) { Result.ThrowError("failed creating oplog"sv); return 1; diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index 5a3f7281b..215614b01 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -172,11 +172,13 @@ private: std::string m_HostName; std::string m_ProjectName; std::string m_OplogName; + std::string m_GcPath; size_t m_MaxBlockSize = 0; size_t m_MaxChunkEmbedSize = 0; bool m_Force = false; bool m_Async = false; bool m_IgnoreMissingAttachments = false; + bool m_Clean = false; std::string m_CloudUrl; std::string m_CloudNamespace; @@ -191,7 +193,6 @@ private: std::string m_ZenUrl; std::string m_ZenProjectName; std::string m_ZenOplogName; - bool m_ZenClean; std::string m_FileDirectoryPath; std::string m_FileName; diff --git a/src/zen/cmds/serve_cmd.cpp b/src/zen/cmds/serve_cmd.cpp index c8117774b..ea9102b28 100644 --- a/src/zen/cmds/serve_cmd.cpp +++ b/src/zen/cmds/serve_cmd.cpp @@ -91,7 +91,7 @@ ServeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ServerInstance->SetOwnerPid(zen::GetCurrentProcessId()); ServerInstance->SpawnServerAndWait(ServerPort); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_CONSOLE("failed to spawn server on port {}: '{}'", ServerPort, Ex.what()); diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp index c5dd31f5e..14f954064 100644 --- a/src/zen/cmds/up_cmd.cpp +++ b/src/zen/cmds/up_cmd.cpp @@ -176,7 +176,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_DEBUG("Exception caught when requesting shutdown: {}", Ex.what()); } diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 39f3f1f78..4881d44ae 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -72,7 +72,7 @@ ZenCmdBase::ParseOptions(int argc, char** argv) { Result = CmdOptions.parse(argc, argv); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { throw zen::OptionParseException(Ex.what()); } @@ -513,7 +513,7 @@ main(int argc, char** argv) { return CmdInfo.Cmd->Run(GlobalOptions, (int)CommandArgVec.size(), CommandArgVec.data()); } - catch (OptionParseException& Ex) + catch (const OptionParseException& Ex) { std::string help = VerbOptions.help(); @@ -526,7 +526,7 @@ main(int argc, char** argv) printf("Unknown command specified: '%s', exiting\n", SubCommand.c_str()); } - catch (OptionParseException& Ex) + catch (const OptionParseException& Ex) { std::string HelpMessage = Options.help(); @@ -534,13 +534,13 @@ main(int argc, char** argv) return 9; } - catch (std::system_error& Ex) + catch (const std::system_error& Ex) { printf("System Error: %s\n", Ex.what()); return Ex.code() ? Ex.code().value() : 10; } - catch (std::exception& Ex) + catch (const std::exception& Ex) { printf("Error: %s\n", Ex.what()); diff --git a/src/zencore/callstack.cpp b/src/zencore/callstack.cpp new file mode 100644 index 000000000..905ab3d9e --- /dev/null +++ b/src/zencore/callstack.cpp @@ -0,0 +1,221 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/callstack.h> +#include <zencore/thread.h> + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +# include <Dbghelp.h> +#endif + +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# include <execinfo.h> +#endif + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif + +#include <fmt/format.h> + +namespace zen { +#if ZEN_PLATFORM_WINDOWS + +class WinSymbolInit +{ +public: + WinSymbolInit() {} + ~WinSymbolInit() + { + m_CallstackLock.WithExclusiveLock([this]() { + if (m_Initialized) + { + SymCleanup(m_CurrentProcess); + } + }); + } + + bool GetSymbol(void* Frame, SYMBOL_INFO* OutSymbolInfo, DWORD64& OutDisplacement) + { + bool Result = false; + m_CallstackLock.WithExclusiveLock([&]() { + if (!m_Initialized) + { + m_CurrentProcess = GetCurrentProcess(); + if (SymInitialize(m_CurrentProcess, NULL, TRUE) == TRUE) + { + m_Initialized = true; + } + } + if (m_Initialized) + { + if (SymFromAddr(m_CurrentProcess, (DWORD64)Frame, &OutDisplacement, OutSymbolInfo) == TRUE) + { + Result = true; + } + } + }); + return Result; + } + +private: + HANDLE m_CurrentProcess = NULL; + BOOL m_Initialized = FALSE; + RwLock m_CallstackLock; +}; + +static WinSymbolInit WinSymbols; + +#endif + +CallstackFrames* +CreateCallstack(uint32_t FrameCount, void** Frames) noexcept +{ + if (FrameCount == 0) + { + return nullptr; + } + CallstackFrames* Callstack = (CallstackFrames*)malloc(sizeof(CallstackFrames) + sizeof(void*) * FrameCount); + if (Callstack != nullptr) + { + Callstack->FrameCount = FrameCount; + if (FrameCount == 0) + { + Callstack->Frames = nullptr; + } + else + { + Callstack->Frames = (void**)&Callstack[1]; + memcpy(Callstack->Frames, Frames, sizeof(void*) * FrameCount); + } + } + return Callstack; +} + +CallstackFrames* +CloneCallstack(const CallstackFrames* Callstack) noexcept +{ + if (Callstack == nullptr) + { + return nullptr; + } + return CreateCallstack(Callstack->FrameCount, Callstack->Frames); +} + +void +FreeCallstack(CallstackFrames* Callstack) noexcept +{ + if (Callstack != nullptr) + { + free(Callstack); + } +} + +uint32_t +GetCallstack(int FramesToSkip, int FramesToCapture, void* OutAddresses[]) +{ +#if ZEN_PLATFORM_WINDOWS + return (uint32_t)CaptureStackBackTrace(FramesToSkip, FramesToCapture, OutAddresses, 0); +#endif +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + void* Frames[FramesToSkip + FramesToCapture]; + int FrameCount = backtrace(Frames, FramesToSkip + FramesToCapture); + if (FrameCount > FramesToSkip) + { + for (int Index = FramesToSkip; Index < FrameCount; Index++) + { + OutAddresses[Index - FramesToSkip] = Frames[Index]; + } + return (uint32_t)(FrameCount - FramesToSkip); + } + else + { + return 0; + } +#endif +} + +std::vector<std::string> +GetFrameSymbols(uint32_t FrameCount, void** Frames) +{ + std::vector<std::string> FrameSymbols; + if (FrameCount > 0) + { + FrameSymbols.resize(FrameCount); +#if ZEN_PLATFORM_WINDOWS + char SymbolBuffer[sizeof(SYMBOL_INFO) + 1024]; + SYMBOL_INFO* SymbolInfo = (SYMBOL_INFO*)SymbolBuffer; + SymbolInfo->SizeOfStruct = sizeof(SYMBOL_INFO); + SymbolInfo->MaxNameLen = 1023; + DWORD64 Displacement = 0; + for (uint32_t FrameIndex = 0; FrameIndex < FrameCount; FrameIndex++) + { + if (WinSymbols.GetSymbol(Frames[FrameIndex], SymbolInfo, Displacement)) + { + FrameSymbols[FrameIndex] = fmt::format("{}+{:#x} [{:#x}]", SymbolInfo->Name, Displacement, (uintptr_t)Frames[FrameIndex]); + } + } +#endif +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + char** messages = backtrace_symbols(Frames, (int)FrameCount); + if (messages) + { + for (uint32_t FrameIndex = 0; FrameIndex < FrameCount; FrameIndex++) + { + FrameSymbols[FrameIndex] = messages[FrameIndex]; + } + free(messages); + } +#endif + } + return FrameSymbols; +} + +void +FormatCallstack(const CallstackFrames* Callstack, StringBuilderBase& SB) +{ + bool First = true; + for (const std::string& Symbol : GetFrameSymbols(Callstack)) + { + if (!First) + { + SB.Append("\n"); + } + else + { + First = false; + } + SB.Append(Symbol); + } +} + +std::string +CallstackToString(const CallstackFrames* Callstack) +{ + StringBuilder<2048> SB; + FormatCallstack(Callstack, SB); + return SB.ToString(); +} + +#if ZEN_WITH_TESTS + +TEST_CASE("Callstack.Basic") +{ + void* Addresses[4]; + uint32_t FrameCount = GetCallstack(1, 4, Addresses); + CHECK(FrameCount > 0); + std::vector<std::string> Symbols = GetFrameSymbols(FrameCount, Addresses); + for (const std::string& Symbol : Symbols) + { + CHECK(!Symbol.empty()); + } +} + +void +callstack_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 58be65f13..143317e65 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -863,7 +863,7 @@ GetDecoder(CompressionMethod Method) ////////////////////////////////////////////////////////////////////////// bool -BufferHeader::IsValid(const CompositeBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) +ReadHeader(const CompositeBuffer& CompressedData, BufferHeader& OutHeader, UniqueBuffer* OutHeaderData) { const uint64_t CompressedDataSize = CompressedData.GetSize(); if (CompressedDataSize < sizeof(BufferHeader)) @@ -871,61 +871,89 @@ BufferHeader::IsValid(const CompositeBuffer& CompressedData, IoHash& OutRawHash, return false; } - const size_t StackBufferSize = 256; - uint8_t StackBuffer[StackBufferSize]; - uint64_t ReadSize = Min(CompressedDataSize, StackBufferSize); - BufferHeader* Header = reinterpret_cast<BufferHeader*>(StackBuffer); + const size_t HeaderBufferSize = 1024; + uint8_t HeaderBuffer[HeaderBufferSize]; + uint64_t ReadSize = Min(CompressedDataSize, HeaderBufferSize); + uint64_t FirstSegmentSize = CompressedData.GetSegments()[0].GetSize(); + if (FirstSegmentSize >= sizeof(BufferHeader)) { - CompositeBuffer::Iterator It; - CompressedData.CopyTo(MutableMemoryView(StackBuffer, StackBuffer + StackBufferSize), It); + // Keep first read inside first segment if possible + ReadSize = Min(ReadSize, FirstSegmentSize); } - Header->ByteSwap(); - if (Header->Magic != BufferHeader::ExpectedMagic) + + MutableMemoryView HeaderMemory(HeaderBuffer, &HeaderBuffer[ReadSize]); + CompositeBuffer::Iterator It = CompressedData.GetIterator(0); + CompressedData.CopyTo(HeaderMemory, It); + + OutHeader = *reinterpret_cast<BufferHeader*>(HeaderMemory.GetData()); + OutHeader.ByteSwap(); + if (OutHeader.Magic != BufferHeader::ExpectedMagic) { return false; } - - const BaseDecoder* const Decoder = GetDecoder(Header->Method); + if (OutHeader.TotalCompressedSize > CompressedDataSize) + { + return false; + } + const BaseDecoder* const Decoder = GetDecoder(OutHeader.Method); if (!Decoder) { return false; } - - uint32_t Crc32 = Header->Crc32; - OutRawHash = IoHash::FromBLAKE3(Header->RawHash); - OutRawSize = Header->TotalRawSize; - uint64_t HeaderSize = Decoder->GetHeaderSize(*Header); - - if (Header->TotalCompressedSize > CompressedDataSize) + uint64_t FullHeaderSize = Decoder->GetHeaderSize(OutHeader); + if (FullHeaderSize > CompressedDataSize) { return false; } - - Header->ByteSwap(); - - if (HeaderSize > ReadSize) + if (OutHeaderData) { - UniqueBuffer HeaderCopy = UniqueBuffer::Alloc(HeaderSize); - CompositeBuffer::Iterator It; - CompressedData.CopyTo(HeaderCopy.GetMutableView(), It); - const MemoryView HeaderView = HeaderCopy.GetView(); - if (Crc32 != BufferHeader::CalculateCrc32(HeaderView)) + *OutHeaderData = UniqueBuffer::Alloc(FullHeaderSize); + MutableMemoryView RemainingHeaderView = OutHeaderData->GetMutableView().CopyFrom(HeaderMemory.Mid(0, FullHeaderSize)); + if (!RemainingHeaderView.IsEmpty()) + { + CompressedData.CopyTo(RemainingHeaderView, It); + } + if (OutHeader.Crc32 != BufferHeader::CalculateCrc32(OutHeaderData->GetView())) + { + return false; + } + } + else if (FullHeaderSize < ReadSize) + { + if (OutHeader.Crc32 != BufferHeader::CalculateCrc32(HeaderMemory.Mid(0, FullHeaderSize))) { return false; } } else { - MemoryView FullHeaderView(StackBuffer, StackBuffer + HeaderSize); - if (Crc32 != BufferHeader::CalculateCrc32(FullHeaderView)) + UniqueBuffer HeaderData = UniqueBuffer::Alloc(FullHeaderSize); + MutableMemoryView RemainingHeaderView = HeaderData.GetMutableView().CopyFrom(HeaderMemory.Mid(0, FullHeaderSize)); + if (!RemainingHeaderView.IsEmpty()) + { + CompressedData.CopyTo(RemainingHeaderView, It); + } + if (OutHeader.Crc32 != BufferHeader::CalculateCrc32(HeaderData.GetView())) { return false; } } - return true; } +bool +BufferHeader::IsValid(const CompositeBuffer& CompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) +{ + detail::BufferHeader Header; + if (ReadHeader(CompressedData, Header, nullptr)) + { + OutRawHash = IoHash::FromBLAKE3(Header.RawHash); + OutRawSize = Header.TotalRawSize; + return true; + } + return false; +} + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// static bool @@ -1097,7 +1125,11 @@ ValidBufferOrEmpty(BufferType&& CompressedData, IoHash& OutRawHash, uint64_t& Ou } CompositeBuffer -GetCompressedRange(const BufferHeader& Header, const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize) +GetCompressedRange(const BufferHeader& Header, + MemoryView HeaderRawData, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize) { if (Header.TotalRawSize < RawOffset + RawSize) { @@ -1118,9 +1150,7 @@ GetCompressedRange(const BufferHeader& Header, const CompositeBuffer& Compressed } else { - UniqueBuffer BlockSizeBuffer; - MemoryView BlockSizeView = - CompressedData.ViewOrCopyRange(sizeof(BufferHeader), Header.BlockCount * sizeof(uint32_t), BlockSizeBuffer); + MemoryView BlockSizeView = HeaderRawData.Mid(sizeof(Header), Header.BlockCount * sizeof(uint32_t)); std::span<uint32_t const> CompressedBlockSizes(reinterpret_cast<const uint32_t*>(BlockSizeView.GetData()), Header.BlockCount); const uint64_t BlockSize = uint64_t(1) << Header.BlockSizeExponent; @@ -1179,7 +1209,11 @@ GetCompressedRange(const BufferHeader& Header, const CompositeBuffer& Compressed } CompositeBuffer -CopyCompressedRange(const BufferHeader& Header, const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize) +CopyCompressedRange(const BufferHeader& Header, + MemoryView HeaderRawData, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize) { if (Header.TotalRawSize < RawOffset + RawSize) { @@ -1204,9 +1238,7 @@ CopyCompressedRange(const BufferHeader& Header, const CompositeBuffer& Compresse } else { - UniqueBuffer BlockSizeBuffer; - MemoryView BlockSizeView = - CompressedData.ViewOrCopyRange(sizeof(BufferHeader), Header.BlockCount * sizeof(uint32_t), BlockSizeBuffer); + MemoryView BlockSizeView = HeaderRawData.Mid(sizeof(Header), Header.BlockCount * sizeof(uint32_t)); std::span<uint32_t const> CompressedBlockSizes(reinterpret_cast<const uint32_t*>(BlockSizeView.GetData()), Header.BlockCount); const uint64_t BlockSize = uint64_t(1) << Header.BlockSizeExponent; @@ -1410,26 +1442,28 @@ CompressedBuffer::DecodeRawHash() const CompressedBuffer CompressedBuffer::CopyRange(uint64_t RawOffset, uint64_t RawSize) const { - using namespace detail; - const BufferHeader Header = BufferHeader::Read(CompressedData); - const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset; - - CompressedBuffer Range; - Range.CompressedData = CopyCompressedRange(Header, CompressedData, RawOffset, TotalRawSize); - + CompressedBuffer Range; + detail::BufferHeader Header; + UniqueBuffer RawHeaderData; + if (ReadHeader(CompressedData, Header, &RawHeaderData)) + { + const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset; + Range.CompressedData = CopyCompressedRange(Header, RawHeaderData.GetView(), CompressedData, RawOffset, TotalRawSize); + } return Range; } CompressedBuffer CompressedBuffer::GetRange(uint64_t RawOffset, uint64_t RawSize) const { - using namespace detail; - const BufferHeader Header = BufferHeader::Read(CompressedData); - const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset; - - CompressedBuffer Range; - Range.CompressedData = GetCompressedRange(Header, CompressedData, RawOffset, TotalRawSize); - + CompressedBuffer Range; + detail::BufferHeader Header; + UniqueBuffer RawHeaderData; + if (ReadHeader(CompressedData, Header, &RawHeaderData)) + { + const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset; + Range.CompressedData = GetCompressedRange(Header, RawHeaderData.GetView(), CompressedData, RawOffset, TotalRawSize); + } return Range; } diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 3e94b550f..ca2b3101f 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -730,7 +730,7 @@ CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop throw std::runtime_error("CopyFile failed in an unexpected way"); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ++FailedFileCount; diff --git a/src/zencore/include/zencore/callstack.h b/src/zencore/include/zencore/callstack.h new file mode 100644 index 000000000..02ba8b3c3 --- /dev/null +++ b/src/zencore/include/zencore/callstack.h @@ -0,0 +1,37 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zencore/string.h> + +#include <string> +#include <vector> + +namespace zen { + +struct CallstackFrames +{ + uint32_t FrameCount; + void** Frames; +}; + +CallstackFrames* CreateCallstack(uint32_t FrameCount, void** Frames) noexcept; +CallstackFrames* CloneCallstack(const CallstackFrames* Callstack) noexcept; +void FreeCallstack(CallstackFrames* Callstack) noexcept; + +uint32_t GetCallstack(int FramesToSkip, int FramesToCapture, void* OutAddresses[]); +std::vector<std::string> GetFrameSymbols(uint32_t FrameCount, void** Frames); +inline std::vector<std::string> +GetFrameSymbols(const CallstackFrames* Callstack) +{ + return GetFrameSymbols(Callstack ? Callstack->FrameCount : 0, Callstack ? Callstack->Frames : nullptr); +} + +void FormatCallstack(const CallstackFrames* Callstack, StringBuilderBase& SB); +std::string CallstackToString(const CallstackFrames* Callstack); + +void callstack_forcelink(); // internal + +} // namespace zen diff --git a/src/zencore/include/zencore/scopeguard.h b/src/zencore/include/zencore/scopeguard.h index d04c8ed9c..3fd0564f6 100644 --- a/src/zencore/include/zencore/scopeguard.h +++ b/src/zencore/include/zencore/scopeguard.h @@ -21,7 +21,11 @@ public: { m_guardFunc(); } - catch (std::exception& Ex) + catch (const AssertException& Ex) + { + ZEN_ERROR("Assert exception in scope guard: {}", Ex.FullDescription()); + } + catch (const std::exception& Ex) { ZEN_ERROR("scope guard threw exception: '{}'", Ex.what()); } diff --git a/src/zencore/include/zencore/zencore.h b/src/zencore/include/zencore/zencore.h index 1a9060e29..cd1a34c7b 100644 --- a/src/zencore/include/zencore/zencore.h +++ b/src/zencore/include/zencore/zencore.h @@ -24,34 +24,63 @@ #endif namespace zen { + +struct CallstackFrames; + class AssertException : public std::logic_error { public: - inline explicit AssertException(const char* Msg) : std::logic_error(Msg) {} + using _Mybase = std::logic_error; + + virtual ~AssertException() noexcept; + + inline AssertException(const char* Msg, struct CallstackFrames* Callstack) noexcept : _Mybase(Msg), _Callstack(Callstack) {} + + AssertException(const AssertException& Rhs) noexcept; + + AssertException(AssertException&& Rhs) noexcept; + + AssertException& operator=(const AssertException& Rhs) noexcept; + + std::string FullDescription() const noexcept; + + struct CallstackFrames* _Callstack = nullptr; }; struct AssertImpl { + ZEN_FORCENOINLINE ZEN_DEBUG_SECTION AssertImpl() : PrevAssertImpl(CurrentAssertImpl) { CurrentAssertImpl = this; } + virtual ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ~AssertImpl() { CurrentAssertImpl = PrevAssertImpl; } + static void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ExecAssert [[noreturn]] (const char* Filename, int LineNumber, const char* FunctionName, const char* Msg) { - CurrentAssertImpl->OnAssert(Filename, LineNumber, FunctionName, Msg); - throw AssertException{Msg}; + AssertImpl* AssertImpl = CurrentAssertImpl; + while (AssertImpl) + { + AssertImpl->OnAssert(Filename, LineNumber, FunctionName, Msg); + AssertImpl = AssertImpl->PrevAssertImpl; + } + ThrowAssertException(Filename, LineNumber, FunctionName, Msg); } -protected: virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION OnAssert(const char* Filename, int LineNumber, const char* FunctionName, const char* Msg) { - (void(Filename)); - (void(LineNumber)); - (void(FunctionName)); - (void(Msg)); + ZEN_UNUSED(Filename); + ZEN_UNUSED(LineNumber); + ZEN_UNUSED(FunctionName); + ZEN_UNUSED(Msg); } - static AssertImpl DefaultAssertImpl; + +protected: + static void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ThrowAssertException + [[noreturn]] (const char* Filename, int LineNumber, const char* FunctionName, const char* Msg); static AssertImpl* CurrentAssertImpl; + static AssertImpl DefaultAssertImpl; + AssertImpl* PrevAssertImpl = nullptr; }; } // namespace zen diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp index 86c08cda9..d26d0dd1e 100644 --- a/src/zencore/jobqueue.cpp +++ b/src/zencore/jobqueue.cpp @@ -69,7 +69,7 @@ public: Stop(); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("Failed shutting down jobqueue. Reason: '{}'", Ex.what()); } @@ -106,7 +106,7 @@ public: }); return {.Id = NewJobId}; } - catch (std::exception& Ex) + catch (const std::exception& Ex) { WorkerCounter.CountDown(); QueueLock.WithExclusiveLock([&]() { @@ -359,7 +359,18 @@ public: CompletedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); }); } - catch (std::exception& Ex) + catch (const AssertException& Ex) + { + ZEN_DEBUG("Background job {}:'{}' asserted. Reason: {}", CurrentJob->Id.Id, CurrentJob->Name, Ex.FullDescription()); + QueueLock.WithExclusiveLock([&]() { + CurrentJob->State.AbortReason = Ex.FullDescription(); + CurrentJob->EndTick = JobClock::Now(); + CurrentJob->WorkerThreadId = 0; + RunningJobs.erase(CurrentJob->Id.Id); + AbortedJobs.insert_or_assign(CurrentJob->Id.Id, std::move(CurrentJob)); + }); + } + catch (const std::exception& Ex) { ZEN_DEBUG("Background job {}:'{}' aborted. Reason: '{}'", CurrentJob->Id.Id, CurrentJob->Name, Ex.what()); QueueLock.WithExclusiveLock([&]() { diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index 16b2310ff..f41c13bf6 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -186,7 +186,13 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) ZEN_TRACE_CPU_FLUSH("AsyncWork"); Work->Execute(); } - catch (std::exception& e) + catch (const AssertException& Ex) + { + Work->m_Exception = std::current_exception(); + + ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + } + catch (const std::exception& e) { Work->m_Exception = std::current_exception(); @@ -234,7 +240,13 @@ WorkerThreadPool::ScheduleWork(Ref<IWork> Work) ZEN_TRACE_CPU_FLUSH("SyncWork"); Work->Execute(); } - catch (std::exception& e) + catch (const AssertException& Ex) + { + Work->m_Exception = std::current_exception(); + + ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + } + catch (const std::exception& e) { Work->m_Exception = std::current_exception(); diff --git a/src/zencore/xmake.lua b/src/zencore/xmake.lua index e6102679d..5f2d95e16 100644 --- a/src/zencore/xmake.lua +++ b/src/zencore/xmake.lua @@ -53,6 +53,7 @@ target('zencore') if is_plat("windows") then add_syslinks("Advapi32") + add_syslinks("Dbghelp") add_syslinks("Shell32") add_syslinks("User32") add_syslinks("crypt32") diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp index c97f5e5ca..c4fcc89de 100644 --- a/src/zencore/zencore.cpp +++ b/src/zencore/zencore.cpp @@ -6,12 +6,9 @@ # include <zencore/windows.h> #endif -#if ZEN_PLATFORM_LINUX -# include <pthread.h> -#endif - #include <zencore/assertfmt.h> #include <zencore/blake3.h> +#include <zencore/callstack.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -55,10 +52,59 @@ ExecAssertFmt(const char* Filename, int LineNumber, const char* FunctionName, st namespace zen { +AssertException::AssertException(const AssertException& Rhs) noexcept : _Mybase(Rhs), _Callstack(CloneCallstack(Rhs._Callstack)) +{ +} + +AssertException::AssertException(AssertException&& Rhs) noexcept : _Mybase(Rhs), _Callstack(Rhs._Callstack) +{ + Rhs._Callstack = nullptr; +} + +AssertException::~AssertException() noexcept +{ + FreeCallstack(_Callstack); +} + +AssertException& +AssertException::operator=(const AssertException& Rhs) noexcept +{ + _Mybase::operator=(Rhs); + + CallstackFrames* Callstack = CloneCallstack(Rhs._Callstack); + std::swap(_Callstack, Callstack); + FreeCallstack(Callstack); + return *this; +} + +std::string +AssertException::FullDescription() const noexcept +{ + if (_Callstack) + { + return fmt::format("'{}'\n{}", what(), CallstackToString(_Callstack)); + } + return what(); +} + +void +AssertImpl::ThrowAssertException(const char* Filename, int LineNumber, const char* FunctionName, const char* Msg) +{ + ZEN_UNUSED(FunctionName); + fmt::basic_memory_buffer<char, 2048> Message; + auto Appender = fmt::appender(Message); + fmt::format_to(Appender, "{}({}): {}", Filename, LineNumber, Msg); + Message.push_back('\0'); + + void* Frames[8]; + uint32_t FrameCount = GetCallstack(3, 8, Frames); + throw AssertException(Message.data(), CreateCallstack(FrameCount, Frames)); +} + void refcount_forcelink(); +AssertImpl* AssertImpl::CurrentAssertImpl = nullptr; AssertImpl AssertImpl::DefaultAssertImpl; -AssertImpl* AssertImpl::CurrentAssertImpl = &AssertImpl::DefaultAssertImpl; ////////////////////////////////////////////////////////////////////////// @@ -138,6 +184,7 @@ void zencore_forcelinktests() { zen::blake3_forcelink(); + zen::callstack_forcelink(); zen::compositebuffer_forcelink(); zen::compress_forcelink(); zen::crypto_forcelink(); @@ -174,24 +221,24 @@ TEST_SUITE_BEGIN("core.assert"); TEST_CASE("Assert.Default") { - bool A = true; - bool B = false; - CHECK_THROWS_WITH(ZEN_ASSERT(A == B), "A == B"); + bool A = true; + bool B = false; + std::string Expected = fmt::format("{}({}): {}", __FILE__, __LINE__ + 1, "A == B"); + CHECK_THROWS_WITH(ZEN_ASSERT(A == B), Expected.c_str()); } TEST_CASE("Assert.Format") { - bool A = true; - bool B = false; - CHECK_THROWS_WITH(ZEN_ASSERT_FORMAT(A == B, "{} == {}", A, B), "assert(A == B) failed: true == false"); + bool A = true; + bool B = false; + std::string Expected = fmt::format("{}({}): {}", __FILE__, __LINE__ + 1, "assert(A == B) failed: true == false"); + CHECK_THROWS_WITH(ZEN_ASSERT_FORMAT(A == B, "{} == {}", A, B), Expected.c_str()); } TEST_CASE("Assert.Custom") { struct MyAssertImpl : AssertImpl { - ZEN_FORCENOINLINE ZEN_DEBUG_SECTION MyAssertImpl() : PrevAssertImpl(CurrentAssertImpl) { CurrentAssertImpl = this; } - virtual ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ~MyAssertImpl() { CurrentAssertImpl = PrevAssertImpl; } virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION OnAssert(const char* Filename, int LineNumber, const char* FunctionName, @@ -202,7 +249,7 @@ TEST_CASE("Assert.Custom") FuncName = FunctionName; Message = Msg; } - AssertImpl* PrevAssertImpl; + AssertImpl* PrevAssertImpl = nullptr; const char* AssertFileName = nullptr; int Line = -1; @@ -213,13 +260,47 @@ TEST_CASE("Assert.Custom") MyAssertImpl MyAssert; bool A = true; bool B = false; - CHECK_THROWS_WITH(ZEN_ASSERT(A == B), "A == B"); + CHECK_THROWS_WITH(ZEN_ASSERT(A == B), std::string(fmt::format("{}({}): {}", __FILE__, __LINE__, "A == B")).c_str()); CHECK(MyAssert.AssertFileName != nullptr); CHECK(MyAssert.Line != -1); CHECK(MyAssert.FuncName != nullptr); CHECK(strcmp(MyAssert.Message, "A == B") == 0); } +TEST_CASE("Assert.Callstack") +{ + try + { + ZEN_ASSERT(false); + } + catch (const AssertException& Assert) + { + ZEN_INFO("Assert failed: {}", Assert.what()); + CHECK(Assert._Callstack->FrameCount > 0); + CHECK(Assert._Callstack->Frames != nullptr); + ZEN_INFO("Callstack:\n{}", CallstackToString(Assert._Callstack)); + } + + WorkerThreadPool Pool(1); + auto Task = Pool.EnqueueTask(std::packaged_task<int()>{[] { + ZEN_ASSERT(false); + return 1; + }}); + + try + { + (void)Task.get(); + CHECK(false); + } + catch (const AssertException& Assert) + { + ZEN_INFO("Assert in future: {}", Assert.what()); + CHECK(Assert._Callstack->FrameCount > 0); + CHECK(Assert._Callstack->Frames != nullptr); + ZEN_INFO("Callstack:\n{}", CallstackToString(Assert._Callstack)); + } +} + TEST_SUITE_END(); #endif diff --git a/src/zenhttp/auth/authmgr.cpp b/src/zenhttp/auth/authmgr.cpp index 18568a21d..a520e8fd1 100644 --- a/src/zenhttp/auth/authmgr.cpp +++ b/src/zenhttp/auth/authmgr.cpp @@ -295,7 +295,7 @@ private: } } } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_ERROR("(de)serialize state FAILED, reason '{}'", Err.what()); @@ -367,7 +367,7 @@ private: ZEN_WARN("save auth state FAILED, reason '{}'", Reason.value()); } } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_WARN("serialize state FAILED, reason '{}'", Err.what()); } diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 9811e5814..262785a0a 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -51,16 +51,6 @@ AsCprBody(const IoBuffer& Obj) return cpr::Body((const char*)Obj.GetData(), Obj.GetSize()); } -static cpr::Body -AsCprBody(const CompositeBuffer& Buffers) -{ - SharedBuffer Buffer = Buffers.Flatten(); - - // This is super inefficient, should be fixed - std::string String{(const char*)Buffer.GetData(), Buffer.GetSize()}; - return cpr::Body{std::move(String)}; -} - ////////////////////////////////////////////////////////////////////////// static HttpClient::Response @@ -221,10 +211,15 @@ struct HttpClient::Impl : public RefCounted CprSession->SetReadCallback({}); return Result; } - inline cpr::Response Post() + inline cpr::Response Post(std::optional<cpr::ReadCallback>&& Read = {}) { + if (Read) + { + CprSession->SetReadCallback(std::move(Read.value())); + } cpr::Response Result = CprSession->Post(); ZEN_TRACE("POST {}", Result); + CprSession->SetReadCallback({}); return Result; } inline cpr::Response Delete() @@ -384,7 +379,7 @@ public: m_FileHandle = nullptr; } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("Failed deleting temp file {}. Reason '{}'", m_FileHandle, Ex.what()); } @@ -724,6 +719,12 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons HttpClient::Response HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { + return Post(Url, Payload, Payload.GetContentType(), AdditionalHeader); +} + +HttpClient::Response +HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) +{ ZEN_TRACE_CPU("HttpClient::PostWithPayload"); return CommonResponse(DoWithRetry( @@ -732,7 +733,7 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMa m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + Sess->UpdateHeader({HeaderContentType(ContentType)}); return Sess.Post(); }, m_ConnectionSettings.RetryCount)); @@ -758,17 +759,30 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi HttpClient::Response HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& AdditionalHeader) { - ZEN_TRACE_CPU("HttpClient::PostPackage"); + return Post(Url, zen::FormatPackageMessageBuffer(Pkg), ZenContentType::kCbPackage, AdditionalHeader); +} + +HttpClient::Response +HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) +{ + ZEN_TRACE_CPU("HttpClient::Post"); return CommonResponse(DoWithRetry( [&]() { - CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); - + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->SetBody(AsCprBody(Message)); - Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbPackage)}); - return Sess.Post(); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); }, m_ConnectionSettings.RetryCount)); } diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index f3559f214..8318e3679 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -149,8 +149,16 @@ public: [[nodiscard]] Response Delete(std::string_view Url, const KeyValueMap& AdditionalHeader = {}); [[nodiscard]] Response Post(std::string_view Url, const KeyValueMap& AdditionalHeader = {}, const KeyValueMap& Parameters = {}); [[nodiscard]] Response Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Post(std::string_view Url, + const IoBuffer& Payload, + ZenContentType ContentType, + const KeyValueMap& AdditionalHeader = {}); [[nodiscard]] Response Post(std::string_view Url, CbObject Payload, const KeyValueMap& AdditionalHeader = {}); [[nodiscard]] Response Post(std::string_view Url, CbPackage Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Post(std::string_view Url, + const CompositeBuffer& Payload, + ZenContentType ContentType, + const KeyValueMap& AdditionalHeader = {}); [[nodiscard]] Response Upload(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {}); [[nodiscard]] Response Upload(std::string_view Url, const CompositeBuffer& Payload, diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index de71eb0a7..cddbe1ae2 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -476,7 +476,15 @@ HttpServerConnection::HandleRequest() { Service->HandleRequest(Request); } - catch (std::system_error& SystemError) + catch (const AssertException& AssertEx) + { + // Drop any partially formatted response + Request.m_Response.reset(); + + ZEN_ERROR("Caught assert exception while handling request: {}", AssertEx.FullDescription()); + Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, AssertEx.FullDescription()); + } + catch (const std::system_error& SystemError) { // Drop any partially formatted response Request.m_Response.reset(); @@ -491,14 +499,14 @@ HttpServerConnection::HandleRequest() Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, SystemError.what()); } } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { // Drop any partially formatted response Request.m_Response.reset(); Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, BadAlloc.what()); } - catch (std::exception& ex) + catch (const std::exception& ex) { // Drop any partially formatted response Request.m_Response.reset(); @@ -958,7 +966,11 @@ HttpAsioServerImpl::Start(uint16_t Port, bool ForceLooopback, int ThreadCount) { m_IoService.run(); } - catch (std::exception& e) + catch (const AssertException& AssertEx) + { + ZEN_ERROR("Assert caught in asio event loop: {}", AssertEx.FullDescription()); + } + catch (const std::exception& e) { ZEN_ERROR("Exception caught in asio event loop: '{}'", e.what()); } @@ -1075,7 +1087,7 @@ HttpAsioServer::Close() { m_Impl->Stop(); } - catch (std::exception& ex) + catch (const std::exception& ex) { ZEN_WARN("Caught exception stopping http asio server: {}", ex.what()); } diff --git a/src/zenhttp/servers/httpparser.cpp b/src/zenhttp/servers/httpparser.cpp index 0a1c5686a..b848a5243 100644 --- a/src/zenhttp/servers/httpparser.cpp +++ b/src/zenhttp/servers/httpparser.cpp @@ -372,7 +372,12 @@ HttpRequestParser::OnMessageComplete() ResetState(); return 0; } - catch (std::system_error& SystemError) + catch (const AssertException& AssertEx) + { + ZEN_WARN("Assert caught when processing http request: {}", AssertEx.FullDescription()); + return 1; + } + catch (const std::system_error& SystemError) { if (IsOOM(SystemError.code())) { @@ -389,13 +394,13 @@ HttpRequestParser::OnMessageComplete() ResetState(); return 1; } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { ZEN_WARN("out of memory when processing http request: '{}'", BadAlloc.what()); ResetState(); return 1; } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("failed processing http request: '{}'", Ex.what()); ResetState(); diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index 4a2615133..09cd76f3e 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -386,7 +386,7 @@ HttpPluginConnectionHandler::HandleRequest() { Service->HandleRequest(Request); } - catch (std::system_error& SystemError) + catch (const std::system_error& SystemError) { // Drop any partially formatted response Request.m_Response.reset(); @@ -401,14 +401,14 @@ HttpPluginConnectionHandler::HandleRequest() Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, SystemError.what()); } } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { // Drop any partially formatted response Request.m_Response.reset(); Request.WriteResponse(HttpResponseCode::InsufficientStorage, HttpContentType::kText, BadAlloc.what()); } - catch (std::exception& ex) + catch (const std::exception& ex) { // Drop any partially formatted response Request.m_Response.reset(); @@ -691,13 +691,13 @@ HttpPluginServerImpl::Initialize(int BasePort, std::filesystem::path DataDir) { Plugin->Initialize(this); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("exception caught during plugin initialization: {}", Ex.what()); } } } - catch (std::exception& ex) + catch (const std::exception& ex) { ZEN_WARN("Caught exception starting http plugin server: {}", ex.what()); } @@ -723,7 +723,7 @@ HttpPluginServerImpl::Close() { Plugin->Shutdown(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("exception caught during plugin shutdown: {}", Ex.what()); } @@ -733,7 +733,7 @@ HttpPluginServerImpl::Close() m_Plugins.clear(); } - catch (std::exception& ex) + catch (const std::exception& ex) { ZEN_WARN("Caught exception stopping http plugin server: {}", ex.what()); } diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 4b812a127..2b97e3f25 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -873,7 +873,12 @@ HttpAsyncWorkRequest::AsyncWorkItem::Execute() new HttpMessageResponseRequest(Tx, 500, "Response generated but no request handler scheduled"sv)); } } - catch (std::exception& Ex) + catch (const AssertException& AssertEx) + { + return (void)Tx.IssueNextRequest( + new HttpMessageResponseRequest(Tx, 500, fmt::format("Assert thrown in async work: '{}", AssertEx.FullDescription()))); + } + catch (const std::exception& Ex) { return (void)Tx.IssueNextRequest( new HttpMessageResponseRequest(Tx, 500, fmt::format("Exception thrown in async work: {}", Ex.what()))); @@ -1485,7 +1490,11 @@ HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler ZEN_WARN("IssueRequest() failed: {}", ErrorCode.message()); } - catch (std::exception& Ex) + catch (const AssertException& AssertEx) + { + ZEN_ERROR("Assert thrown in IssueNextRequest(): {}", AssertEx.FullDescription()); + } + catch (const std::exception& Ex) { ZEN_ERROR("exception caught in IssueNextRequest(): {}", Ex.what()); } @@ -1995,7 +2004,12 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT // Unable to route return new HttpMessageResponseRequest(Transaction(), 404, "No suitable route found"sv); } - catch (std::system_error& SystemError) + catch (const AssertException& AssertEx) + { + ZEN_ERROR("Caught assert exception while handling request: {}", AssertEx.FullDescription()); + return new HttpMessageResponseRequest(Transaction(), (uint16_t)HttpResponseCode::InternalServerError, AssertEx.FullDescription()); + } + catch (const std::system_error& SystemError) { if (IsOOM(SystemError.code()) || IsOOD(SystemError.code())) { @@ -2005,11 +2019,11 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT ZEN_ERROR("Caught system error exception while handling request: {}", SystemError.what()); return new HttpMessageResponseRequest(Transaction(), (uint16_t)HttpResponseCode::InternalServerError, SystemError.what()); } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { return new HttpMessageResponseRequest(Transaction(), (uint16_t)HttpResponseCode::InsufficientStorage, BadAlloc.what()); } - catch (std::exception& ex) + catch (const std::exception& ex) { ZEN_ERROR("Caught exception while handling request: '{}'", ex.what()); return new HttpMessageResponseRequest(Transaction(), (uint16_t)HttpResponseCode::InternalServerError, ex.what()); diff --git a/src/zenhttp/transports/asiotransport.cpp b/src/zenhttp/transports/asiotransport.cpp index a9a782821..96a15518c 100644 --- a/src/zenhttp/transports/asiotransport.cpp +++ b/src/zenhttp/transports/asiotransport.cpp @@ -426,7 +426,7 @@ AsioTransportPlugin::Initialize(TransportServer* ServerInterface) { m_IoService.run(); } - catch (std::exception& e) + catch (const std::exception& e) { ZEN_ERROR("exception caught in asio event loop: {}", e.what()); } diff --git a/src/zenhttp/transports/winsocktransport.cpp b/src/zenhttp/transports/winsocktransport.cpp index 7407c55dd..8c82760bb 100644 --- a/src/zenhttp/transports/winsocktransport.cpp +++ b/src/zenhttp/transports/winsocktransport.cpp @@ -309,7 +309,7 @@ SocketTransportPluginImpl::Initialize(TransportServer* ServerInterface) { Connection->HandleConnection(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("exception caught in connection loop: {}", Ex.what()); } diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 412a2d26a..325b15e3f 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -1228,12 +1228,10 @@ TEST_CASE("zcache.rpc") if (Result.status_code == 200) { CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())); - if (!Response.IsNull()) - { - OutResult.Response = std::move(Response); - CHECK(OutResult.Result.Parse(OutResult.Response)); - OutResult.Success = true; - } + CHECK(!Response.IsNull()); + OutResult.Response = std::move(Response); + CHECK(OutResult.Result.Parse(OutResult.Response)); + OutResult.Success = true; } return OutResult; diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 8093a0735..75ff03912 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -603,7 +603,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, EmitStats("cas", Stats.CasStats); EmitStats("project", Stats.ProjectStats); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("exception in disk stats gathering for '{}': {}", m_ServerOptions.DataDir, Ex.what()); } @@ -622,7 +622,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, Obj.EndArray(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("exception in state gathering for '{}': {}", m_ServerOptions.SystemRootDir, Ex.what()); } diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 00581d758..aa0eedb0e 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -128,7 +128,7 @@ ReadAllCentralManifests(const std::filesystem::path& SystemRoot) ZEN_WARN("failed to load manifest '{}': {}", File, ToString(ValidateError)); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("failed to load manifest '{}': {}", File, Ex.what()); } @@ -1004,7 +1004,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) { Result = options.parse(argc, argv); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { throw zen::OptionParseException(Ex.what()); } @@ -1069,7 +1069,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) ValidateOptions(ServerOptions); } - catch (zen::OptionParseException& e) + catch (const zen::OptionParseException& e) { ZEN_CONSOLE_ERROR("Error parsing zenserver arguments: {}\n\n{}", e.what(), options.help()); diff --git a/src/zenserver/config/luaconfig.cpp b/src/zenserver/config/luaconfig.cpp index cdc808cf6..f742fa34a 100644 --- a/src/zenserver/config/luaconfig.cpp +++ b/src/zenserver/config/luaconfig.cpp @@ -280,7 +280,7 @@ Options::Parse(const std::filesystem::path& Path, const cxxopts::ParseResult& Cm config(); } - catch (std::exception& e) + catch (const std::exception& e) { throw std::runtime_error(fmt::format("failed to load config script ('{}'): {}", Path, e.what()).c_str()); } diff --git a/src/zenserver/diag/diagsvcs.cpp b/src/zenserver/diag/diagsvcs.cpp index 1a10782e9..f0aec98ab 100644 --- a/src/zenserver/diag/diagsvcs.cpp +++ b/src/zenserver/diag/diagsvcs.cpp @@ -36,7 +36,7 @@ ReadLogFile(const std::string& Path, StringBuilderBase& Out) return true; } - catch (std::exception&) + catch (const std::exception&) { Out.Reset(); return false; diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 7a6d2dd22..6b31dc82e 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -247,7 +247,12 @@ ZenEntryPoint::Run() Server.Run(); } - catch (std::exception& e) + catch (const AssertException& AssertEx) + { + ZEN_CRITICAL("Caught assert exception in main for process {}: {}", zen::GetCurrentProcessId(), AssertEx.FullDescription()); + RequestApplicationExit(1); + } + catch (const std::exception& e) { ZEN_CRITICAL("Caught exception in main for process {}: {}", zen::GetCurrentProcessId(), e.what()); RequestApplicationExit(1); @@ -407,7 +412,12 @@ main(int argc, char* argv[]) return App.Run(); #endif } - catch (std::exception& Ex) + catch (const AssertException& AssertEx) + { + fprintf(stderr, "ERROR: Caught assert exception in main: '%s'", AssertEx.FullDescription().c_str()); + return 1; + } + catch (const std::exception& Ex) { fprintf(stderr, "ERROR: Caught exception in main: '%s'", Ex.what()); diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 4248bbf2a..764bea355 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -79,7 +79,7 @@ public: } Result.RawHash = IoHash::HashBuffer(Payload); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what()); @@ -108,7 +108,7 @@ public: Offset += Segment.GetSize(); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog attachment to '{}'. Reason: {}", ChunkPath, Ex.what()); diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index 1508dbc3f..6b1f591f0 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -276,7 +276,7 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi .Timeout = std::chrono::milliseconds(1800000), .AssumeHttp2 = Options.AssumeHttp2, .AllowResume = true, - .RetryCount = 2}; + .RetryCount = 4}; // 1) Access token as parameter in request // 2) Environment variable (different win vs linux/mac) // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index dd390d08c..84ed6f842 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -209,7 +209,7 @@ namespace { std::string(Url), std::string(Project), std::string(Oplog)}; - RemoteStore = CreateZenRemoteStore(Options); + RemoteStore = CreateZenRemoteStore(Options, TempFilePath); } if (!RemoteStore) @@ -1679,7 +1679,7 @@ ProjectStore::Project::WriteAccessTimes() WriteFile(ProjectAccessTimesFilePath, Data.GetBuffer().AsIoBuffer()); } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_WARN("writing access times FAILED, reason: '{}'", Err.what()); } @@ -1714,7 +1714,7 @@ ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem: Log->Write(); return Log; } - catch (std::exception&) + catch (const std::exception&) { // In case of failure we need to ensure there's no half constructed entry around // @@ -1760,7 +1760,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) return Log; } - catch (std::exception& ex) + catch (const std::exception& ex) { ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); @@ -2371,7 +2371,7 @@ ProjectStore::OpenProject(std::string_view ProjectId) Prj->Read(); return Prj; } - catch (std::exception& e) + catch (const std::exception& e) { ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what()); m_Projects.erase(std::string{ProjectId}); @@ -4017,7 +4017,7 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) } } } - catch (std::exception&) + catch (const std::exception&) { while (!Checkers.empty()) { diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index ae4777278..8efb92e6b 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -835,7 +835,7 @@ BuildContainer(CidStore& ChunkStore, } } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), fmt::format("Failed to resolve attachment {}", RawHash), @@ -1216,7 +1216,7 @@ BuildContainer(CidStore& ChunkStore, return {}; } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) @@ -1740,7 +1740,7 @@ SaveOplog(CidStore& ChunkStore, CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), Ex.what(), @@ -2658,7 +2658,7 @@ LoadOplog(CidStore& ChunkStore, ReportMessage(OptionalContext, fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", RemoteStoreInfo.ContainerName, - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksDownloaded.load(), diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index cfb558040..600338843 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -6,15 +6,10 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> #include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> #include <zencore/stream.h> -#include <zencore/timer.h> +#include <zenhttp/httpclient.h> #include <zenutil/packageformat.h> -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -ZEN_THIRD_PARTY_INCLUDES_END - namespace zen { using namespace std::literals; @@ -22,17 +17,16 @@ using namespace std::literals; class ZenRemoteStore : public RemoteProjectStore { public: - ZenRemoteStore(std::string_view HostAddress, - std::string_view Project, - std::string_view Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize) + ZenRemoteStore(std::string_view HostAddress, + std::string_view Project, + std::string_view Oplog, + const std::filesystem::path& TempFilePath) : m_HostAddress(HostAddress) , m_ProjectStoreUrl(fmt::format("{}/prj"sv, m_HostAddress)) , m_Project(Project) , m_Oplog(Oplog) - , m_MaxBlockSize(MaxBlockSize) - , m_MaxChunkEmbedSize(MaxChunkEmbedSize) + , m_TempFilePath(TempFilePath) + , m_Client(m_ProjectStoreUrl, {.LogCategory = "ZenRemoteStore", .RetryCount = 2}) { } @@ -47,39 +41,27 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/save"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); - MemoryView Data(Payload.GetView()); - Session->SetBody({reinterpret_cast<const char*>(Data.GetData()), Data.GetSize()}); - cpr::Response Response = Session->Post(); - SaveResult Result = SaveResult{ConvertResult(Response)}; + std::string SaveRequest = fmt::format("/{}/oplog/{}/save"sv, m_Project, m_Oplog); + HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCbObject); + SaveResult Result = SaveResult{ConvertResult(Response)}; if (Result.ErrorCode) { - Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}. Reason: '{}'", m_ProjectStoreUrl, m_Project, m_Oplog, Result.Reason); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } - IoBuffer ResponsePayload(IoBuffer::Wrap, Response.text.data(), Response.text.size()); - CbObject ResponseObject = LoadCompactBinaryObject(ResponsePayload); + CbObject ResponseObject = Response.AsObject(); if (!ResponseObject) { - Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, - m_ProjectStoreUrl, - m_Project, - m_Oplog); - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, + m_ProjectStoreUrl, + m_Project, + m_Oplog); + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); return Result; } CbArrayView NeedsArray = ResponseObject["need"sv].AsArrayView(); @@ -89,33 +71,15 @@ public: Result.Needs.insert(ChunkHash); } - Result.RawHash = IoHash::HashBuffer(Payload); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + Result.RawHash = IoHash::HashBuffer(Payload); return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - cpr::Response Response = Session->Post(); - SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; + std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); + SaveAttachmentResult Result = SaveAttachmentResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", @@ -125,14 +89,11 @@ public: RawHash, Result.Reason); } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override { - Stopwatch Timer; - CbPackage RequestPackage; { CbObjectWriter RequestWriter; @@ -151,26 +112,10 @@ public: RequestWriter.EndArray(); // "chunks" RequestPackage.SetObject(RequestWriter.Save()); } - CompositeBuffer Payload = FormatPackageMessageBuffer(RequestPackage, FormatFlags::kDefault); - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl({SaveRequest}); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); + std::string SaveRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); + HttpClient::Response Response = m_Client.Post(SaveRequest, RequestPackage); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - cpr::Response Response = Session->Post(); - SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; + SaveAttachmentsResult Result = SaveAttachmentsResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving {} oplog attachments to {}/{}/{}. Reason: '{}'", @@ -180,17 +125,12 @@ public: m_Oplog, Result.Reason); } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/rpc"sv, m_ProjectStoreUrl, m_Project, m_Oplog); + std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); CbObject Request; { @@ -206,17 +146,22 @@ public: RequestWriter.EndArray(); // "chunks" Request = RequestWriter.Save(); } - IoBuffer Payload = Request.GetBuffer().AsIoBuffer(); - Session->SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); - Session->SetUrl(SaveRequest); - Session->SetHeader({{"Content-Type", std::string(MapContentTypeToString(HttpContentType::kCbObject))}, - {"Accept", std::string(MapContentTypeToString(HttpContentType::kCbPackage))}}); - cpr::Response Response = Session->Post(); - LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; - if (!Result.ErrorCode) + HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); + + LoadAttachmentsResult Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) { - CbPackage Package = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size())); + Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", + RawHashes.size(), + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + else + { + CbPackage Package = Response.AsPackage(); std::span<const CbAttachment> Attachments = Package.GetAttachments(); Result.Chunks.reserve(Attachments.size()); for (const CbAttachment& Attachment : Attachments) @@ -225,42 +170,17 @@ public: std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); } } - else - { - Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", - RawHashes.size(), - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; }; - virtual FinalizeResult FinalizeContainer(const IoHash&) override - { - Stopwatch Timer; - - RwLock::ExclusiveLockScope _(SessionsLock); - Sessions.clear(); - return FinalizeResult{Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}}; - } + virtual FinalizeResult FinalizeContainer(const IoHash&) override { return FinalizeResult{Result{}}; } virtual LoadContainerResult LoadContainer() override { - Stopwatch Timer; + std::string LoadRequest = fmt::format("/{}/oplog/{}/load"sv, m_Project, m_Oplog); - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - std::string SaveRequest = fmt::format("{}/{}/oplog/{}/load"sv, m_ProjectStoreUrl, m_Project, m_Oplog); - Session->SetUrl(SaveRequest); - Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCbObject))}}); - Session->SetParameters( - {{"maxblocksize", fmt::format("{}", m_MaxBlockSize)}, {"maxchunkembedsize", fmt::format("{}", m_MaxChunkEmbedSize)}}); - cpr::Response Response = Session->Get(); - - LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; + HttpClient::Response Response = m_Client.Get(LoadRequest, HttpClient::Accept(ZenContentType::kCbObject)); + LoadContainerResult Result = LoadContainerResult{ConvertResult(Response)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}. Reason: '{}'", @@ -271,7 +191,7 @@ public: } else { - Result.ContainerObject = LoadCompactBinaryObject(IoBuffer(IoBuffer::Clone, Response.text.data(), Response.text.size())); + Result.ContainerObject = Response.AsObject(); if (!Result.ContainerObject) { Result.Reason = fmt::format("The response for {}/{}/{} is not formatted as a compact binary object"sv, @@ -281,7 +201,6 @@ public: Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); } } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } @@ -299,19 +218,14 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { - Stopwatch Timer; - - std::unique_ptr<cpr::Session> Session(AllocateSession()); - auto _ = MakeGuard([this, &Session]() { ReleaseSession(std::move(Session)); }); - - std::string LoadRequest = fmt::format("{}/{}/oplog/{}/{}"sv, m_ProjectStoreUrl, m_Project, m_Oplog, RawHash); - Session->SetUrl({LoadRequest}); - Session->SetHeader({{"Accept", std::string(MapContentTypeToString(HttpContentType::kCompressedBinary))}}); - cpr::Response Response = Session->Get(); - LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; + std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = + m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); + LoadAttachmentResult Result = LoadAttachmentResult{ConvertResult(Response)}; if (!Result.ErrorCode) { - Result.Bytes = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + Result.Bytes = Response.ResponsePayload; + Result.Bytes.MakeOwned(); } if (!Result.ErrorCode) { @@ -322,73 +236,40 @@ public: RawHash, Result.Reason); } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } private: - std::unique_ptr<cpr::Session> AllocateSession() - { - RwLock::ExclusiveLockScope _(SessionsLock); - if (Sessions.empty()) - { - Sessions.emplace_back(std::make_unique<cpr::Session>()); - } - std::unique_ptr<cpr::Session> Session = std::move(Sessions.back()); - Sessions.pop_back(); - return Session; - } - - void ReleaseSession(std::unique_ptr<cpr::Session>&& Session) + static Result ConvertResult(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { - RwLock::ExclusiveLockScope _(SessionsLock); - Sessions.emplace_back(std::move(Session)); - } - - static Result ConvertResult(const cpr::Response& Response) - { - std::string Text; - std::string Reason = Response.reason; - int32_t ErrorCode = 0; - if (Response.error.code != cpr::ErrorCode::OK) + if (Response.Error) { - ErrorCode = static_cast<int32_t>(Response.error.code); - if (!Response.error.message.empty()) - { - Reason = Response.error.message; - } + return {.ErrorCode = Response.Error.value().ErrorCode, + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(""), + .Text = Response.ToText()}; } - else if (!IsHttpSuccessCode(Response.status_code)) + if (!Response.IsSuccess()) { - ErrorCode = static_cast<int32_t>(Response.status_code); - - if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) - { - zen::HttpContentType ContentType = zen::ParseContentType(It->second); - if (ContentType == zen::HttpContentType::kText) - { - Text = Response.text; - } - } - - Reason = fmt::format("{}"sv, Response.status_code); + return {.ErrorCode = static_cast<int32_t>(Response.StatusCode), + .ElapsedSeconds = Response.ElapsedSeconds, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Text = Response.ToText()}; } - return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.elapsed, .Reason = Reason, .Text = Text}; + return {.ErrorCode = 0, .ElapsedSeconds = Response.ElapsedSeconds}; } - RwLock SessionsLock; - std::vector<std::unique_ptr<cpr::Session>> Sessions; + const std::string m_HostAddress; + const std::string m_ProjectStoreUrl; + const std::string m_Project; + const std::string m_Oplog; + const std::filesystem::path m_TempFilePath; - const std::string m_HostAddress; - const std::string m_ProjectStoreUrl; - const std::string m_Project; - const std::string m_Oplog; - const size_t m_MaxBlockSize; - const size_t m_MaxChunkEmbedSize; + HttpClient m_Client; }; std::shared_ptr<RemoteProjectStore> -CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) +CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) @@ -397,7 +278,7 @@ CreateZenRemoteStore(const ZenRemoteStoreOptions& Options) Url = fmt::format("http://{}"sv, Url); } std::shared_ptr<RemoteProjectStore> RemoteStore = - std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, Options.MaxBlockSize, Options.MaxChunkEmbedSize); + std::make_shared<ZenRemoteStore>(Url, Options.ProjectId, Options.OplogId, TempFilePath); return RemoteStore; } diff --git a/src/zenserver/projectstore/zenremoteprojectstore.h b/src/zenserver/projectstore/zenremoteprojectstore.h index 9f079ee74..7c81a597d 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.h +++ b/src/zenserver/projectstore/zenremoteprojectstore.h @@ -13,6 +13,6 @@ struct ZenRemoteStoreOptions : RemoteStoreOptions std::string OplogId; }; -std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options); +std::shared_ptr<RemoteProjectStore> CreateZenRemoteStore(const ZenRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath); } // namespace zen diff --git a/src/zenserver/sentryintegration.cpp b/src/zenserver/sentryintegration.cpp index 11bf78a75..a8d967985 100644 --- a/src/zenserver/sentryintegration.cpp +++ b/src/zenserver/sentryintegration.cpp @@ -31,13 +31,10 @@ namespace sentry { struct SentryAssertImpl : zen::AssertImpl { - ZEN_FORCENOINLINE ZEN_DEBUG_SECTION SentryAssertImpl(); - virtual ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ~SentryAssertImpl(); virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION OnAssert(const char* Filename, int LineNumber, const char* FunctionName, const char* Msg) override; - AssertImpl* PrevAssertImpl; }; class sentry_sink final : public spdlog::sinks::base_sink<spdlog::details::null_mutex> @@ -85,7 +82,7 @@ sentry_sink::sink_it_(const spdlog::details::log_msg& msg) sentry_event_value_add_stacktrace(event, NULL, 0); sentry_capture_event(event); } - catch (std::exception&) + catch (const std::exception&) { // If our logging with Message formatting fails we do a non-allocating version and just post the msg.payload raw char TmpBuffer[256]; @@ -105,16 +102,6 @@ sentry_sink::flush_() { } -SentryAssertImpl::SentryAssertImpl() : PrevAssertImpl(CurrentAssertImpl) -{ - CurrentAssertImpl = this; -} - -SentryAssertImpl::~SentryAssertImpl() -{ - CurrentAssertImpl = PrevAssertImpl; -} - void SentryAssertImpl::OnAssert(const char* Filename, int LineNumber, const char* FunctionName, const char* Msg) { @@ -128,7 +115,7 @@ SentryAssertImpl::OnAssert(const char* Filename, int LineNumber, const char* Fun sentry_event_value_add_stacktrace(event, NULL, 0); sentry_capture_event(event); } - catch (std::exception&) + catch (const std::exception&) { // If our logging with Message formatting fails we do a non-allocating version and just post the Msg raw sentry_value_t event = sentry_value_new_message_event( diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index dac29c273..6d1d026cc 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -152,7 +152,7 @@ namespace detail { return m_Status.EndpointStatus(); } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -292,7 +292,7 @@ namespace detail { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -388,7 +388,7 @@ namespace detail { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -615,7 +615,7 @@ namespace detail { }); } } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -825,7 +825,7 @@ namespace detail { return m_Status.EndpointStatus(); } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -861,7 +861,7 @@ namespace detail { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -984,7 +984,7 @@ namespace detail { return {.Status = {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}}; } } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -1405,7 +1405,7 @@ namespace detail { .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& Err) + catch (const std::exception& Err) { m_Status.Set(UpstreamEndpointState::kError, Err.what()); @@ -1980,7 +1980,7 @@ private: { ProcessCacheRecord(std::move(CacheRecord)); } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", CacheRecord.Namespace, @@ -2052,7 +2052,7 @@ private: } } } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what()); } diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp index 5ef89ee77..5c9f32c69 100644 --- a/src/zenserver/vfs/vfsimpl.cpp +++ b/src/zenserver/vfs/vfsimpl.cpp @@ -238,7 +238,7 @@ VfsService::Impl::VfsThread() m_VfsThreadRunning.Set(); m_VfsHost->Run(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("exception caught in VFS thread: {}", Ex.what()); diff --git a/src/zenserver/vfs/vfsservice.cpp b/src/zenserver/vfs/vfsservice.cpp index 04ba29ed2..d302a10ec 100644 --- a/src/zenserver/vfs/vfsservice.cpp +++ b/src/zenserver/vfs/vfsservice.cpp @@ -105,7 +105,7 @@ VfsService::VfsService() { m_Impl->Mount(Mountpath); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, Ex.what()); } @@ -123,7 +123,7 @@ VfsService::VfsService() { m_Impl->Unmount(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, Ex.what()); } diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index d1faeb8b6..e6e451952 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -761,7 +761,7 @@ ZenServer::Cleanup() m_Http = {}; m_JobQueue.reset(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("exception thrown during Cleanup() in {}: '{}'", ZEN_APP_NAME, Ex.what()); } @@ -831,7 +831,7 @@ ZenServer::CheckStateMarker() return; } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("state marker at {} could not be checked, reason: '{}'", StateMarkerPath, Ex.what()); RequestExit(1); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 69487f9dc..a576ff022 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -482,7 +482,11 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const { if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) { - return Block->GetChunk(Location.Offset, Location.Size); + IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size); + if (Chunk.GetSize() == Location.Size) + { + return Chunk; + } } } return IoBuffer(); @@ -911,7 +915,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, NewBlockFile = nullptr; } } - catch (std::system_error& SystemError) + catch (const std::system_error& SystemError) { if (IsOOM(SystemError.code())) { @@ -926,11 +930,11 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ZEN_ERROR("reclaiming space for '{}' failed with system error exception: '{}'", m_BlocksBasePath, SystemError.what()); } } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { ZEN_WARN("reclaiming space for '{}' ran out of memory: '{}'", m_BlocksBasePath, BadAlloc.what()); } - catch (std::exception& ex) + catch (const std::exception& ex) { ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); } diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d897e26ce..f53ab6f8b 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -871,7 +871,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(const std::function<uin m_LogFlushPosition = LogCount; } } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); } @@ -1141,7 +1141,7 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc) con } IoBuffer -ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const +ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const { ZEN_TRACE_CPU("Z$::Bucket::GetStandaloneCacheValue"); @@ -1152,9 +1152,11 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(ZenContentType ContentTy if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { - Data.SetContentType(ContentType); - - return Data; + if (Data.GetSize() == Loc.Size()) + { + Data.SetContentType(Loc.GetContentType()); + return Data; + } } return {}; @@ -1211,7 +1213,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { - OutValue.Value = GetStandaloneCacheValue(Location.GetContentType(), HashKey); + OutValue.Value = GetStandaloneCacheValue(Location, HashKey); } else { @@ -1432,7 +1434,7 @@ ZenCacheDiskLayer::CacheBucket::Flush() SaveSnapshot(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("Failed to flush bucket in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } @@ -1538,7 +1540,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); WriteFile(ManifestPath, Buffer); } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } @@ -1621,7 +1623,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) else { // Structured cache value - IoBuffer Buffer = GetStandaloneCacheValue(Loc.GetContentType(), HashKey); + IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey); if (!Buffer) { ReportBadKey(HashKey); @@ -1880,7 +1882,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) IoBuffer Buffer; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - if (Buffer = GetStandaloneCacheValue(Loc.GetContentType(), Key); !Buffer) + if (Buffer = GetStandaloneCacheValue(Loc, Key); !Buffer) { continue; } @@ -1983,7 +1985,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) { SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("Failed to write index and manifest after GC in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } @@ -2269,9 +2271,8 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(RwLock::SharedLockScope& IndexLo const BucketPayload& Payload = m_Payloads[Index]; if (Payload.Location.IsFlagSet(DiskLocation::kStructured)) { - IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) - ? GetStandaloneCacheValue(Payload.Location.GetContentType(), Key) - : GetInlineCacheValue(Payload.Location); + IoBuffer Value = Payload.Location.IsFlagSet(DiskLocation::kStandaloneFile) ? GetStandaloneCacheValue(Payload.Location, Key) + : GetInlineCacheValue(Payload.Location); CbObjectView Obj(Value.GetData()); Obj.IterateAttachments([&Attachments](CbFieldView Field) { Attachments.emplace_back(Field.AsAttachment()); }); } @@ -2920,7 +2921,7 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { SaveSnapshot([]() { return 0; }); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("Failed to write index and manifest after RemoveExpiredData in '{}'. Reason: '{}'", m_BucketDir, Ex.what()); } @@ -3033,7 +3034,7 @@ public: m_IndexLock.reset(); m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("~DiskBucketReferenceChecker threw exception: '{}'", Ex.what()); } @@ -3066,7 +3067,7 @@ public: { m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); - std::vector<IoHash> StandaloneKeys; + std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; { std::vector<IoHash> InlineKeys; std::unordered_map<uint32_t, std::size_t> BlockIndexToEntriesPerBlockIndex; @@ -3099,7 +3100,7 @@ public: const IoHash& Key = Entry.first; if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - StandaloneKeys.push_back(Key); + StandaloneKeys.push_back(std::make_pair(Key, Loc)); continue; } @@ -3157,7 +3158,7 @@ public: } } } - for (const IoHash& Key : StandaloneKeys) + for (const auto& It : StandaloneKeys) { if (Ctx.IsCancelledFlag.load()) { @@ -3165,7 +3166,7 @@ public: return; } - IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(ZenContentType::kCbObject, Key); + IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first); if (!Buffer) { continue; @@ -3366,7 +3367,7 @@ ZenCacheDiskLayer::~ZenCacheDiskLayer() // This can cause a deadlock, if GC is running we would block while holding ZenCacheDiskLayer::m_Lock m_DroppedBuckets.clear(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("~ZenCacheDiskLayer() failed. Reason: '{}'", Ex.what()); } @@ -3490,7 +3491,7 @@ ZenCacheDiskLayer::DiscoverBuckets() { IsOk = DeleteDirectories(BadBucketPath); } - catch (std::exception&) + catch (const std::exception&) { } @@ -3633,14 +3634,14 @@ ZenCacheDiskLayer::Flush() { Bucket->Flush(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("Failed flushing bucket. Reason: '{}'", Ex.what()); } }); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } @@ -3877,7 +3878,7 @@ ZenCacheDiskLayer::MemCacheTrim() } }); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("Failed scheduling ZenCacheDiskLayer::MemCacheTrim. Reason: '{}'", Ex.what()); m_IsMemCacheTrimming.store(false); diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index daa628f77..c4ee6f4d3 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -400,7 +400,7 @@ ZenCacheStore::LogWorker() ObjectSize, ToString(Item.Value.Value.GetContentType())) } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_LOG_INFO(LogCacheActivity, "{} [{}] {}/{}/{} failed: Reason: '{}'", @@ -436,7 +436,7 @@ ZenCacheStore::LogWorker() m_LogEvent.Wait(); m_LogEvent.Reset(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("Log writer failed: '{}'", Ex.what()); } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 17cf20e35..84905df15 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -338,7 +338,7 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx) m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); } - catch (ScrubDeadlineExpiredException&) + catch (const ScrubDeadlineExpiredException&) { ZEN_INFO("Scrubbing deadline expired, operation incomplete"); } @@ -934,7 +934,7 @@ CasContainerStrategy::MakeIndexSnapshot() EntryCount = Entries.size(); m_LogFlushPosition = IndexLogPosition; } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 428183827..0f3e2ab5a 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -729,19 +729,28 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) ZEN_ASSERT(m_IsInitialized); + uint64_t ExpectedSize = 0; { RwLock::SharedLockScope _(m_Lock); - if (!m_Index.contains(ChunkHash)) + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) + { + ExpectedSize = It->second.Size; + } + else { return {}; } } - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); - return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); Chunk.GetSize() == ExpectedSize) + { + return Chunk; + } + + return {}; } bool @@ -1201,7 +1210,7 @@ FileCasStrategy::MakeIndexSnapshot() EntryCount = Entries.size(); m_LogFlushPosition = IndexLogPosition; } - catch (std::exception& Err) + catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 1a34019fb..d51144a5a 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -663,7 +663,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->second.CompactStoreStats); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed removing expired data for {}. Reason: '{}'", Owner->GetGcName(Ctx), Ex.what()); } @@ -733,7 +733,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner)); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed creating reference pruners for {}. Reason: '{}'", ReferenceStore->GetGcName(Ctx), @@ -806,7 +806,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) } } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed creating reference checkers for {}. Reason: '{}'", Referencer->GetGcName(Ctx), @@ -863,7 +863,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) SCOPED_TIMER(Stats->second.PreCacheStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); Checker->PreCache(Ctx); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed precaching for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); } @@ -919,7 +919,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); Checker->LockState(Ctx); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); } @@ -997,7 +997,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->CompactStoreStats); } } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Pruner->GetGcName(Ctx), Ex.what()); } @@ -1563,7 +1563,7 @@ GcScheduler::AppendGCLog(GcClock::TimePoint StartTime, const GcSettings& Setting GcLogFile.Write(EntryBuffer, AppendPos); } } - catch (std::system_error& SystemError) + catch (const std::system_error& SystemError) { if (IsOOM(SystemError.code())) { @@ -1578,11 +1578,11 @@ GcScheduler::AppendGCLog(GcClock::TimePoint StartTime, const GcSettings& Setting ZEN_ERROR("writing gc result failed with system error exception: '{}'", SystemError.what()); } } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { ZEN_WARN("writing gc result ran out of memory: '{}'", BadAlloc.what()); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("writing gc result failed with: '{}'", Ex.what()); } @@ -1970,7 +1970,7 @@ GcScheduler::SchedulerThread() WaitTime = std::chrono::seconds(0); } - catch (std::system_error& SystemError) + catch (const std::system_error& SystemError) { if (IsOOM(SystemError.code())) { @@ -1988,14 +1988,14 @@ GcScheduler::SchedulerThread() m_LastLightweightGcTime = m_LastGcTime; WaitTime = m_Config.MonitorInterval; } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { ZEN_WARN("scheduling garbage collection ran out of memory: '{}'", BadAlloc.what()); m_LastGcTime = GcClock::Now(); m_LastLightweightGcTime = m_LastGcTime; WaitTime = m_Config.MonitorInterval; } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("scheduling garbage collection failed with: '{}'", Ex.what()); m_LastGcTime = GcClock::Now(); @@ -2028,7 +2028,7 @@ GcScheduler::ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds Time Ctx.SetShouldDelete(DoDelete); m_GcManager.ScrubStorage(Ctx); } - catch (ScrubDeadlineExpiredException&) + catch (const ScrubDeadlineExpiredException&) { ZEN_INFO("scrubbing deadline expired (top level), operation incomplete!"); } @@ -2189,7 +2189,7 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, SchedulerState << "LastGcExpireTime"sv << static_cast<int64_t>(m_LastGcExpireTime.time_since_epoch().count()); SaveCompactBinaryObject(Path, SchedulerState.Save()); } - catch (std::system_error& SystemError) + catch (const std::system_error& SystemError) { if (IsOOM(SystemError.code())) { @@ -2204,11 +2204,11 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, ZEN_ERROR("writing gc scheduler state failed with system error exception: '{}'", SystemError.what()); } } - catch (std::bad_alloc& BadAlloc) + catch (const std::bad_alloc& BadAlloc) { ZEN_WARN("writing gc scheduler state ran out of memory: '{}'", BadAlloc.what()); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("writing gc scheduler state failed with: '{}'", Ex.what()); } diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 7aced67ad..471cc5dcd 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -326,7 +326,7 @@ public: void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); - IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; + IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 759af792d..9bef4d1a4 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -73,7 +73,7 @@ struct RecordedRequestsWriter WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); } @@ -455,7 +455,7 @@ RecordedRequestsSegmentWriter::EndWrite() WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); } @@ -562,7 +562,7 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); } } - catch (std::exception&) + catch (const std::exception&) { RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; @@ -738,7 +738,7 @@ RecordedRequestsWriter::WriterThreadMain() RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); } - catch (std::exception&) + catch (const std::exception&) { // TODO: what's the right behaviour here? The most likely cause would // be some I/O error and we probably ought to just shut down recording @@ -867,7 +867,7 @@ RecordedRequestsWriter::WriteRecordingMetadata() WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); } @@ -913,7 +913,7 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In return TotalRequestCount; } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_WARN("could not read metadata file: {}", Ex.what()); } @@ -950,7 +950,7 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In } } } - catch (std::exception&) + catch (const std::exception&) { } diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index e4a99fc30..ca4649ba8 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -60,7 +60,7 @@ public: RwLock::ExclusiveLockScope RotateLock(m_Lock); m_CurrentFile.Close(); } - catch (std::exception&) + catch (const std::exception&) { } } @@ -101,7 +101,7 @@ public: } } } - catch (std::exception&) + catch (const std::exception&) { // Silently eat errors } @@ -116,7 +116,7 @@ public: m_CurrentFile.Flush(); } } - catch (std::exception&) + catch (const std::exception&) { // Silently eat errors } @@ -129,7 +129,7 @@ public: RwLock::ExclusiveLockScope _(m_Lock); m_Formatter = spdlog::details::make_unique<spdlog::pattern_formatter>(pattern); } - catch (std::exception&) + catch (const std::exception&) { // Silently eat errors } @@ -141,7 +141,7 @@ public: RwLock::ExclusiveLockScope _(m_Lock); m_Formatter = std::move(sink_formatter); } - catch (std::exception&) + catch (const std::exception&) { // Silently eat errors } diff --git a/src/zenutil/openprocesscache.cpp b/src/zenutil/openprocesscache.cpp index 39e4aea90..fb654bde2 100644 --- a/src/zenutil/openprocesscache.cpp +++ b/src/zenutil/openprocesscache.cpp @@ -42,7 +42,7 @@ OpenProcessCache::~OpenProcessCache() } m_Sessions.clear(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("OpenProcessCache destructor failed with reason: `{}`", Ex.what()); } @@ -175,7 +175,7 @@ OpenProcessCache::GcWorker() { GCHandles(); } - catch (std::exception& Ex) + catch (const std::exception& Ex) { ZEN_ERROR("gc of open process cache failed with reason: `{}`", Ex.what()); } diff --git a/src/zenutil/packageformat.cpp b/src/zenutil/packageformat.cpp index 7c284a4e6..2e0f2dc7c 100644 --- a/src/zenutil/packageformat.cpp +++ b/src/zenutil/packageformat.cpp @@ -357,6 +357,11 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint BinaryReader Reader(Payload); + if (Payload.GetSize() < sizeof(CbPackageHeader)) + { + throw std::invalid_argument(fmt::format("invalid CbPackage, missing complete header (size {})", Payload.GetSize())); + } + CbPackageHeader Hdr; Reader.Read(&Hdr, sizeof Hdr); @@ -378,8 +383,8 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint tsl::robin_map<std::string, IoBuffer> PartialFileBuffers; - // TODO: Throwing before this loop completes could result in leaking handles as we might not have picked up all the handles in the - // message + std::vector<std::pair<uint32_t, std::string>> MalformedAttachments; + for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; @@ -438,30 +443,34 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint } } - if (!FullFileBuffer) + if (FullFileBuffer) { - // Unable to open chunk reference - throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", - i, - Path, - AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize)); - } - - IoBuffer ChunkReference = AttachRefHdr->PayloadByteOffset == 0 && AttachRefHdr->PayloadByteSize == FullFileBuffer.GetSize() - ? FullFileBuffer - : IoBuffer(FullFileBuffer, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); + IoBuffer ChunkReference = AttachRefHdr->PayloadByteOffset == 0 && AttachRefHdr->PayloadByteSize == FullFileBuffer.GetSize() + ? FullFileBuffer + : IoBuffer(FullFileBuffer, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); - CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkReference))); - if (!CompBuf) + CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkReference))); + if (CompBuf) + { + Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); + } + else + { + MalformedAttachments.push_back(std::make_pair(i, + fmt::format("Invalid format in '{}' (offset {}, size {})", + Path, + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize))); + } + } + else { - throw std::invalid_argument(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})", - i, - Path, - AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize)); + MalformedAttachments.push_back(std::make_pair(i, + fmt::format("Unable to resolve chunk at '{}' (offset {}, size {})", + Path, + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize))); } - Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); } else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { @@ -470,26 +479,39 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint if (i == 0) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); - if (!CompBuf) + if (CompBuf) + { + Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); + } + else { - throw std::invalid_argument(fmt::format("invalid format for chunk #{} expected compressed buffer for CbObject", i)); + // First payload is always a compact binary object + MalformedAttachments.push_back(std::make_pair( + i, + fmt::format("Invalid format, expected compressed buffer for CbObject (size {})", AttachmentBuffer.GetSize()))); } - // First payload is always a compact binary object - Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); } else { - ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); + MalformedAttachments.push_back( + std::make_pair(i, + fmt::format("Invalid format, compressed object attachments are not currently supported (size {})", + AttachmentBuffer.GetSize()))); } } else { CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); - if (!CompBuf) + if (CompBuf) + { + Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); + } + else { - throw std::invalid_argument(fmt::format("invalid format for chunk #{} expected compressed buffer for attachment", i)); + MalformedAttachments.push_back(std::make_pair( + i, + fmt::format("Invalid format, expected compressed buffer for attachment (size {})", AttachmentBuffer.GetSize()))); } - Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); } } else /* not compressed */ @@ -502,7 +524,10 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint } else { - ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); + MalformedAttachments.push_back( + std::make_pair(i, + fmt::format("Invalid format, object attachments are not currently supported (size {})", + AttachmentBuffer.GetSize()))); } } else @@ -521,6 +546,20 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint Package.AddAttachments(Attachments); + using namespace std::literals; + + if (!MalformedAttachments.empty()) + { + StringBuilder<1024> SB; + SB << (uint64_t)MalformedAttachments.size() << " malformed attachments in package message:\n"; + for (const auto& It : MalformedAttachments) + { + SB << " #"sv << It.first << ": " << It.second << "\n"; + } + ZEN_WARN("{}", SB.ToView()); + throw std::invalid_argument(SB.ToString()); + } + return Package; } |