diff options
| author | Dan Engelbrecht <[email protected]> | 2025-02-25 15:48:43 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-02-25 15:48:43 +0100 |
| commit | 5bc5b0dd59c0f02afe553e5074dfe57951b19044 (patch) | |
| tree | 625d46a9ef656cd6dd5f2879182f686b0299f44b /src | |
| parent | 5.5.18 (diff) | |
| download | zen-5bc5b0dd59c0f02afe553e5074dfe57951b19044.tar.xz zen-5bc5b0dd59c0f02afe553e5074dfe57951b19044.zip | |
improvements and infrastructure for upcoming builds api command line (#284)
* add modification tick to filesystem traversal
* add ShowDetails option to ProgressBar
* log callstack if we terminate process
* handle chunking if MaxSize > 1MB
* BasicFile write helpers and WriteToTempFile simplifications
* bugfix for CompositeBuffer::IterateRange when using DecompressToComposite for actually comrpessed data
revert of earlier optimization
* faster compress/decompress for large disk-based files
* enable progress feedback in IoHash::HashBuffer
* add payload validation in HttpClient::Get
* fix range requests (range is including end byte)
* remove BuildPartId for blob/block related operations in builds api
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/copy_cmd.cpp | 6 | ||||
| -rw-r--r-- | src/zen/cmds/serve_cmd.cpp | 2 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 20 | ||||
| -rw-r--r-- | src/zen/zen.h | 3 | ||||
| -rw-r--r-- | src/zencore/basicfile.cpp | 79 | ||||
| -rw-r--r-- | src/zencore/compositebuffer.cpp | 34 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 214 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 26 | ||||
| -rw-r--r-- | src/zencore/include/zencore/basicfile.h | 6 | ||||
| -rw-r--r-- | src/zencore/include/zencore/filesystem.h | 20 | ||||
| -rw-r--r-- | src/zencore/include/zencore/iohash.h | 4 | ||||
| -rw-r--r-- | src/zencore/iohash.cpp | 28 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 27 | ||||
| -rw-r--r-- | src/zenserver/objectstore/objectstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 31 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 55 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/chunkedfile.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 13 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 85 | ||||
| -rw-r--r-- | src/zenutil/logging.cpp | 9 |
21 files changed, 399 insertions, 271 deletions
diff --git a/src/zen/cmds/copy_cmd.cpp b/src/zen/cmds/copy_cmd.cpp index d42d3c107..cc6ddd505 100644 --- a/src/zen/cmds/copy_cmd.cpp +++ b/src/zen/cmds/copy_cmd.cpp @@ -120,7 +120,11 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { } - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override + virtual void VisitFile(const std::filesystem::path& Parent, + const path_view& File, + uint64_t FileSize, + uint32_t, + uint64_t) override { ZEN_UNUSED(FileSize); std::error_code Ec; diff --git a/src/zen/cmds/serve_cmd.cpp b/src/zen/cmds/serve_cmd.cpp index 8e36e74ce..f87725e36 100644 --- a/src/zen/cmds/serve_cmd.cpp +++ b/src/zen/cmds/serve_cmd.cpp @@ -120,7 +120,7 @@ ServeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) struct FsVisitor : public FileSystemTraversal::TreeVisitor { - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override { std::filesystem::path ServerPath = std::filesystem::relative(Parent / File, RootPath); std::string ServerPathString = reinterpret_cast<const char*>(ServerPath.generic_u8string().c_str()); diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index fd58b024a..872ea8941 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -24,6 +24,7 @@ #include "cmds/vfs_cmd.h" #include "cmds/workspaces_cmd.h" +#include <zencore/callstack.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -251,7 +252,10 @@ ZenCmdBase::ResolveTargetHostSpec(const std::string& InHostSpec) return ResolveTargetHostSpec(InHostSpec, /* out */ Dummy); } -ProgressBar::ProgressBar(bool PlainProgress) : m_PlainProgress(PlainProgress), m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000) +ProgressBar::ProgressBar(bool PlainProgress, bool ShowDetails) +: m_PlainProgress(PlainProgress) +, m_ShowDetails(ShowDetails) +, m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000) { } @@ -270,6 +274,7 @@ ProgressBar::~ProgressBar() void ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) { + ZEN_ASSERT(NewState.TotalCount >= NewState.RemainingCount); if (DoLinebreak == false && m_State == NewState) { return; @@ -288,7 +293,8 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) if (m_PlainProgress) { - ZEN_CONSOLE("{} {}% ({})", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS)); + std::string Details = (m_ShowDetails && !NewState.Details.empty()) ? fmt::format(": {}", NewState.Details) : ""; + ZEN_CONSOLE("{} {}% ({}){}", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details); } else { @@ -327,7 +333,7 @@ ProgressBar::ForceLinebreak() void ProgressBar::Finish() { - if (m_LastOutputLength > 0 && m_State.RemainingCount > 0) + if (m_LastOutputLength > 0) { State NewState = m_State; NewState.RemainingCount = 0; @@ -367,7 +373,13 @@ main(int argc, char** argv) // Set output mode to handle virtual terminal sequences zen::logging::EnableVTMode(); - std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); }); + std::set_terminate([]() { + void* Frames[8]; + uint32_t FrameCount = GetCallstack(2, 8, Frames); + CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); + ZEN_CRITICAL("Program exited abnormally via std::terminate()\n{}", CallstackToString(Callstack, " ")); + FreeCallstack(Callstack); + }); LoggerRef DefaultLogger = zen::logging::Default(); auto& Sinks = DefaultLogger.SpdLogger->sinks(); diff --git a/src/zen/zen.h b/src/zen/zen.h index 9c9586050..835c2b6ac 100644 --- a/src/zen/zen.h +++ b/src/zen/zen.h @@ -84,7 +84,7 @@ public: uint64_t RemainingCount = 0; }; - explicit ProgressBar(bool PlainProgress); + explicit ProgressBar(bool PlainProgress, bool ShowDetails = true); ~ProgressBar(); void UpdateState(const State& NewState, bool DoLinebreak); @@ -95,6 +95,7 @@ public: private: const bool m_PlainProgress; + const bool m_ShowDetails; Stopwatch m_SW; uint64_t m_LastUpdateMS; State m_State; diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp index b3bffd34d..6e879ca0d 100644 --- a/src/zencore/basicfile.cpp +++ b/src/zencore/basicfile.cpp @@ -281,6 +281,20 @@ BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<voi } uint64_t +BasicFile::Write(const CompositeBuffer& Data, uint64_t FileOffset) +{ + std::error_code Ec; + uint64_t WrittenBytes = Write(Data, FileOffset, Ec); + + if (Ec) + { + std::error_code Dummy; + throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy))); + } + return WrittenBytes; +} + +uint64_t BasicFile::Write(const CompositeBuffer& Data, uint64_t FileOffset, std::error_code& Ec) { uint64_t WrittenBytes = 0; @@ -309,6 +323,8 @@ BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec) void BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec) { + ZEN_ASSERT(m_FileHandle != nullptr); + Ec.clear(); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; @@ -817,6 +833,17 @@ BasicFileWriter::Write(const void* Data, uint64_t Size, uint64_t FileOffset) } void +BasicFileWriter::Write(const CompositeBuffer& Data, uint64_t FileOffset) +{ + for (const SharedBuffer& Segment : Data.GetSegments()) + { + const uint64_t SegmentSize = Segment.GetSize(); + Write(Segment.GetData(), SegmentSize, FileOffset); + FileOffset += SegmentSize; + } +} + +void BasicFileWriter::Flush() { const uint64_t BufferedBytes = m_BufferEnd - m_BufferStart; @@ -831,23 +858,20 @@ BasicFileWriter::Flush() } IoBuffer -WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path, std::function<bool(std::error_code& Ec)>&& RetryCallback) +WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path) { - if (std::filesystem::is_regular_file(Path)) + TemporaryFile Temp; + std::error_code Ec; + Temp.CreateTemporary(Path.parent_path(), Ec); + if (Ec) { - IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path)); - if (ExistingTempFile && ExistingTempFile.GetSize() == Buffer.GetSize()) - { - ExistingTempFile.SetDeleteOnClose(true); - return ExistingTempFile; - } + throw std::system_error(Ec, fmt::format("Failed to create temp file for blob at '{}'", Path)); } - BasicFile BlockFile; - BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, std::move(RetryCallback)); - uint64_t Offset = 0; + { + uint64_t Offset = 0; static const uint64_t BufferingSize = 256u * 1024u; - BasicFileWriter BufferedOutput(BlockFile, BufferingSize / 2); + // BasicFileWriter BufferedOutput(BlockFile, BufferingSize / 2); for (const SharedBuffer& Segment : Buffer.GetSegments()) { size_t SegmentSize = Segment.GetSize(); @@ -859,22 +883,39 @@ WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path, std FileRef.FileChunkOffset, FileRef.FileChunkSize, BufferingSize, - [&BufferedOutput, &Offset](const void* Data, size_t Size) { - BufferedOutput.Write(Data, Size, Offset); + [&Temp, &Offset](const void* Data, size_t Size) { + Temp.Write(Data, Size, Offset); Offset += Size; }); } else { - BufferedOutput.Write(Segment.GetData(), SegmentSize, Offset); + Temp.Write(Segment.GetData(), SegmentSize, Offset); Offset += SegmentSize; } } } - void* FileHandle = BlockFile.Detach(); - IoBuffer BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); - BlockBuffer.SetDeleteOnClose(true); - return BlockBuffer; + + Temp.MoveTemporaryIntoPlace(Path, Ec); + if (Ec) + { + IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path); + if (TmpBuffer) + { + IoHash ExistingHash = IoHash::HashBuffer(TmpBuffer); + const IoHash ExpectedHash = IoHash::HashBuffer(Buffer); + if (ExistingHash == ExpectedHash) + { + TmpBuffer.SetDeleteOnClose(true); + return TmpBuffer; + } + } + throw std::system_error(Ec, fmt::format("Failed to move temp file to '{}'", Path)); + } + + IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path); + TmpBuffer.SetDeleteOnClose(true); + return TmpBuffer; } ////////////////////////////////////////////////////////////////////////// diff --git a/src/zencore/compositebuffer.cpp b/src/zencore/compositebuffer.cpp index 49870a304..252ac9045 100644 --- a/src/zencore/compositebuffer.cpp +++ b/src/zencore/compositebuffer.cpp @@ -275,36 +275,18 @@ CompositeBuffer::IterateRange(uint64_t Offset, Visitor(View, Segment); break; } - if (Offset < SegmentSize) + else if (Offset <= SegmentSize) { - if (Offset == 0 && Size >= SegmentSize) + const MemoryView View = Segment.GetView().Mid(Offset, Size); + Offset = 0; + if (Size == 0 || !View.IsEmpty()) { - const MemoryView View = Segment.GetView(); - if (!View.IsEmpty()) - { - Visitor(View, Segment); - } - Size -= View.GetSize(); - if (Size == 0) - { - break; - } + Visitor(View, Segment); } - else + Size -= View.GetSize(); + if (Size == 0) { - // If we only want a section of the segment, do a subrange so we don't have to materialize the entire iobuffer - IoBuffer SubRange(Segment.AsIoBuffer(), Offset, Min(Size, SegmentSize - Offset)); - const MemoryView View = SubRange.GetView(); - if (!View.IsEmpty()) - { - Visitor(View, Segment); - } - Size -= View.GetSize(); - if (Size == 0) - { - break; - } - Offset = 0; + break; } } else diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 29c1d9256..0e2ce2b54 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -2,6 +2,7 @@ #include <zencore/compress.h> +#include <zencore/basicfile.h> #include <zencore/blake3.h> #include <zencore/compositebuffer.h> #include <zencore/crc32.h> @@ -314,37 +315,77 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize) CompressedBlockSizes.reserve(BlockCount); uint64_t CompressedSize = 0; { - UniqueBuffer RawBlockCopy; MutableMemoryView CompressedBlocksView = CompressedData.GetMutableView() + sizeof(BufferHeader) + MetaSize; - CompositeBuffer::Iterator It = RawData.GetIterator(0); - - for (uint64_t RawOffset = 0; RawOffset < RawSize;) + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) { - const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize); - const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy); - RawHash.Append(RawBlock); - - MutableMemoryView CompressedBlock = CompressedBlocksView; - if (!CompressBlock(CompressedBlock, RawBlock)) + ZEN_ASSERT(FileRef.FileHandle != nullptr); + UniqueBuffer RawBlockCopy = UniqueBuffer::Alloc(BlockSize); + BasicFile Source; + Source.Attach(FileRef.FileHandle); + for (uint64_t RawOffset = 0; RawOffset < RawSize;) { - return CompositeBuffer(); - } + const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize); + Source.Read(RawBlockCopy.GetData(), RawBlockSize, FileRef.FileChunkOffset + RawOffset); + const MemoryView RawBlock = RawBlockCopy.GetView().Left(RawBlockSize); + RawHash.Append(RawBlock); + MutableMemoryView CompressedBlock = CompressedBlocksView; + if (!CompressBlock(CompressedBlock, RawBlock)) + { + Source.Detach(); + return CompositeBuffer(); + } - uint64_t CompressedBlockSize = CompressedBlock.GetSize(); - if (RawBlockSize <= CompressedBlockSize) - { - CompressedBlockSize = RawBlockSize; - CompressedBlocksView = CompressedBlocksView.CopyFrom(RawBlock); + uint64_t CompressedBlockSize = CompressedBlock.GetSize(); + if (RawBlockSize <= CompressedBlockSize) + { + CompressedBlockSize = RawBlockSize; + CompressedBlocksView = CompressedBlocksView.CopyFrom(RawBlock); + } + else + { + CompressedBlocksView += CompressedBlockSize; + } + + CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize)); + CompressedSize += CompressedBlockSize; + RawOffset += RawBlockSize; } - else + Source.Detach(); + } + else + { + UniqueBuffer RawBlockCopy; + CompositeBuffer::Iterator It = RawData.GetIterator(0); + + for (uint64_t RawOffset = 0; RawOffset < RawSize;) { - CompressedBlocksView += CompressedBlockSize; - } + const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize); + const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy); + RawHash.Append(RawBlock); - CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize)); - CompressedSize += CompressedBlockSize; - RawOffset += RawBlockSize; + MutableMemoryView CompressedBlock = CompressedBlocksView; + if (!CompressBlock(CompressedBlock, RawBlock)) + { + return CompositeBuffer(); + } + + uint64_t CompressedBlockSize = CompressedBlock.GetSize(); + if (RawBlockSize <= CompressedBlockSize) + { + CompressedBlockSize = RawBlockSize; + CompressedBlocksView = CompressedBlocksView.CopyFrom(RawBlock); + } + else + { + CompressedBlocksView += CompressedBlockSize; + } + + CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize)); + CompressedSize += CompressedBlockSize; + RawOffset += RawBlockSize; + } } } @@ -560,51 +601,118 @@ BlockDecoder::TryDecompressTo(const BufferHeader& Header, CompressedOffset += CompressedBlockSize; } - for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++) + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if ((CompressedData.GetSegments().size() == 1) && CompressedData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) { - const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize; - const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); - const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize; + ZEN_ASSERT(FileRef.FileHandle != nullptr); + BasicFile Source; + Source.Attach(FileRef.FileHandle); - const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawView.GetSize(), UncompressedBlockSize - OffsetInFirstBlock) - : zen::Min(RemainingRawSize, BlockSize); + for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++) + { + const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize; + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize; - MemoryView CompressedBlock = CompressedData.ViewOrCopyRange(CompressedOffset, CompressedBlockSize, CompressedBlockCopy); + const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 + ? zen::Min(RawView.GetSize(), UncompressedBlockSize - OffsetInFirstBlock) + : zen::Min(RemainingRawSize, BlockSize); - if (IsCompressed) - { - MutableMemoryView UncompressedBlock = RawView.Left(BytesToUncompress); + if (CompressedBlockCopy.GetSize() < CompressedBlockSize) + { + CompressedBlockCopy = UniqueBuffer::Alloc(CompressedBlockSize); + } + Source.Read(CompressedBlockCopy.GetData(), CompressedBlockSize, FileRef.FileChunkOffset + CompressedOffset); - const bool IsAligned = BytesToUncompress == UncompressedBlockSize; - if (!IsAligned) + MemoryView CompressedBlock = CompressedBlockCopy.GetView().Left(CompressedBlockSize); + + if (IsCompressed) { - // Decompress to a temporary buffer when the first or the last block reads are not aligned with the block boundaries. - if (UncompressedBlockCopy.IsNull()) + MutableMemoryView UncompressedBlock = RawView.Left(BytesToUncompress); + + const bool IsAligned = BytesToUncompress == UncompressedBlockSize; + if (!IsAligned) { - UncompressedBlockCopy = UniqueBuffer::Alloc(BlockSize); + // Decompress to a temporary buffer when the first or the last block reads are not aligned with the block boundaries. + if (UncompressedBlockCopy.IsNull()) + { + UncompressedBlockCopy = UniqueBuffer::Alloc(BlockSize); + } + UncompressedBlock = UncompressedBlockCopy.GetMutableView().Mid(0, UncompressedBlockSize); } - UncompressedBlock = UncompressedBlockCopy.GetMutableView().Mid(0, UncompressedBlockSize); - } - if (!DecompressBlock(UncompressedBlock, CompressedBlock)) - { - return false; - } + if (!DecompressBlock(UncompressedBlock, CompressedBlock)) + { + Source.Detach(); + return false; + } - if (!IsAligned) + if (!IsAligned) + { + RawView.CopyFrom(UncompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress)); + } + } + else { - RawView.CopyFrom(UncompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress)); + RawView.CopyFrom(CompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress)); } + + OffsetInFirstBlock = 0; + RemainingRawSize -= BytesToUncompress; + CompressedOffset += CompressedBlockSize; + RawView += BytesToUncompress; } - else + Source.Detach(); + } + else + { + for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++) { - RawView.CopyFrom(CompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress)); - } + const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize; + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize; - OffsetInFirstBlock = 0; - RemainingRawSize -= BytesToUncompress; - CompressedOffset += CompressedBlockSize; - RawView += BytesToUncompress; + const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 + ? zen::Min(RawView.GetSize(), UncompressedBlockSize - OffsetInFirstBlock) + : zen::Min(RemainingRawSize, BlockSize); + + MemoryView CompressedBlock = CompressedData.ViewOrCopyRange(CompressedOffset, CompressedBlockSize, CompressedBlockCopy); + + if (IsCompressed) + { + MutableMemoryView UncompressedBlock = RawView.Left(BytesToUncompress); + + const bool IsAligned = BytesToUncompress == UncompressedBlockSize; + if (!IsAligned) + { + // Decompress to a temporary buffer when the first or the last block reads are not aligned with the block boundaries. + if (UncompressedBlockCopy.IsNull()) + { + UncompressedBlockCopy = UniqueBuffer::Alloc(BlockSize); + } + UncompressedBlock = UncompressedBlockCopy.GetMutableView().Mid(0, UncompressedBlockSize); + } + + if (!DecompressBlock(UncompressedBlock, CompressedBlock)) + { + return false; + } + + if (!IsAligned) + { + RawView.CopyFrom(UncompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress)); + } + } + else + { + RawView.CopyFrom(CompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress)); + } + + OffsetInFirstBlock = 0; + RemainingRawSize -= BytesToUncompress; + CompressedOffset += CompressedBlockSize; + RawView += BytesToUncompress; + } } return RemainingRawSize == 0; diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index b8c35212f..5716d1255 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -683,7 +683,7 @@ CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop { } - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override { std::error_code Ec; const std::filesystem::path Relative = std::filesystem::relative(Parent, BasePath, Ec); @@ -1236,7 +1236,11 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr } else { - Visitor.VisitFile(RootDir, FileName, DirInfo->EndOfFile.QuadPart, gsl::narrow<uint32_t>(DirInfo->FileAttributes)); + Visitor.VisitFile(RootDir, + FileName, + DirInfo->EndOfFile.QuadPart, + gsl::narrow<uint32_t>(DirInfo->FileAttributes), + (uint64_t)DirInfo->LastWriteTime.QuadPart); } const uint64_t NextOffset = DirInfo->NextEntryOffset; @@ -1285,7 +1289,7 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr } else if (S_ISREG(Stat.st_mode)) { - Visitor.VisitFile(RootDir, FileName, Stat.st_size, gsl::narrow<uint32_t>(Stat.st_mode)); + Visitor.VisitFile(RootDir, FileName, Stat.st_size, gsl::narrow<uint32_t>(Stat.st_mode), gsl::narrow<uint64_t>(Stat.st_mtime)); } else { @@ -1544,7 +1548,8 @@ GetDirectoryContent(const std::filesystem::path& RootDir, DirectoryContentFlags virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, - uint32_t NativeModeOrAttributes) override + uint32_t NativeModeOrAttributes, + uint64_t NativeModificationTick) override { if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles)) { @@ -1557,6 +1562,10 @@ GetDirectoryContent(const std::filesystem::path& RootDir, DirectoryContentFlags { Content.FileAttributes.push_back(NativeModeOrAttributes); } + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeModificationTick)) + { + Content.FileModificationTicks.push_back(NativeModificationTick); + } } } @@ -1612,7 +1621,8 @@ GetDirectoryContent(const std::filesystem::path& RootDir, virtual void VisitFile(const std::filesystem::path&, const path_view& File, uint64_t FileSize, - uint32_t NativeModeOrAttributes) override + uint32_t NativeModeOrAttributes, + uint64_t NativeModificationTick) override { if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles)) { @@ -1625,6 +1635,10 @@ GetDirectoryContent(const std::filesystem::path& RootDir, { Content.FileAttributes.push_back(NativeModeOrAttributes); } + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeModificationTick)) + { + Content.FileModificationTicks.push_back(NativeModificationTick); + } } } @@ -1928,7 +1942,7 @@ TEST_CASE("filesystem") // Traversal struct : public FileSystemTraversal::TreeVisitor { - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t, uint32_t) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t, uint32_t, uint64_t) override { bFoundExpected |= std::filesystem::equivalent(Parent / File, Expected); } diff --git a/src/zencore/include/zencore/basicfile.h b/src/zencore/include/zencore/basicfile.h index 7edd40c9c..a78132879 100644 --- a/src/zencore/include/zencore/basicfile.h +++ b/src/zencore/include/zencore/basicfile.h @@ -60,6 +60,7 @@ public: void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); void Write(MemoryView Data, uint64_t FileOffset); void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec); + uint64_t Write(const CompositeBuffer& Data, uint64_t FileOffset); uint64_t Write(const CompositeBuffer& Data, uint64_t FileOffset, std::error_code& Ec); void Write(const void* Data, uint64_t Size, uint64_t FileOffset); void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec); @@ -174,6 +175,7 @@ public: ~BasicFileWriter(); void Write(const void* Data, uint64_t Size, uint64_t FileOffset); + void Write(const CompositeBuffer& Data, uint64_t FileOffset); void Flush(); private: @@ -184,9 +186,7 @@ private: uint64_t m_BufferEnd; }; -IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, - const std::filesystem::path& Path, - std::function<bool(std::error_code& Ec)>&& RetryCallback); +IoBuffer WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path); ZENCORE_API void basicfile_forcelink(); diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index ca8682cd7..250745e86 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -203,7 +203,8 @@ public: virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, - uint32_t NativeModeOrAttributes) = 0; + uint32_t NativeModeOrAttributes, + uint64_t NativeModificationTick) = 0; // This should return true if we should recurse into the directory virtual bool VisitDirectory(const std::filesystem::path& Parent, @@ -216,13 +217,14 @@ public: enum class DirectoryContentFlags : uint8_t { - None = 0, - IncludeDirs = 1u << 0, - IncludeFiles = 1u << 1, - Recursive = 1u << 2, - IncludeFileSizes = 1u << 3, - IncludeAttributes = 1u << 4, - IncludeAllEntries = IncludeDirs | IncludeFiles | Recursive + None = 0, + IncludeDirs = 1u << 0, + IncludeFiles = 1u << 1, + Recursive = 1u << 2, + IncludeFileSizes = 1u << 3, + IncludeAttributes = 1u << 4, + IncludeModificationTick = 1u << 5, + IncludeAllEntries = IncludeDirs | IncludeFiles | Recursive }; ENUM_CLASS_FLAGS(DirectoryContentFlags) @@ -232,6 +234,7 @@ struct DirectoryContent std::vector<std::filesystem::path> Files; std::vector<uint64_t> FileSizes; std::vector<uint32_t> FileAttributes; + std::vector<uint64_t> FileModificationTicks; std::vector<std::filesystem::path> Directories; std::vector<uint32_t> DirectoryAttributes; }; @@ -246,6 +249,7 @@ public: std::vector<std::filesystem::path> FileNames; std::vector<uint64_t> FileSizes; std::vector<uint32_t> FileAttributes; + std::vector<uint64_t> FileModificationTicks; std::vector<std::filesystem::path> DirectoryNames; std::vector<uint32_t> DirectoryAttributes; }; diff --git a/src/zencore/include/zencore/iohash.h b/src/zencore/include/zencore/iohash.h index 8871a5895..7443e17b7 100644 --- a/src/zencore/include/zencore/iohash.h +++ b/src/zencore/include/zencore/iohash.h @@ -47,8 +47,8 @@ struct IoHash static IoHash HashBuffer(const void* data, size_t byteCount); static IoHash HashBuffer(MemoryView Data) { return HashBuffer(Data.GetData(), Data.GetSize()); } - static IoHash HashBuffer(const CompositeBuffer& Buffer); - static IoHash HashBuffer(const IoBuffer& Buffer); + static IoHash HashBuffer(const CompositeBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes = nullptr); + static IoHash HashBuffer(const IoBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes = nullptr); static IoHash FromHexString(const char* string); static IoHash FromHexString(const std::string_view string); static bool TryParse(std::string_view Str, IoHash& Hash); diff --git a/src/zencore/iohash.cpp b/src/zencore/iohash.cpp index 7200e6e3f..3b2af0db4 100644 --- a/src/zencore/iohash.cpp +++ b/src/zencore/iohash.cpp @@ -30,7 +30,7 @@ IoHash::HashBuffer(const void* data, size_t byteCount) } IoHash -IoHash::HashBuffer(const CompositeBuffer& Buffer) +IoHash::HashBuffer(const CompositeBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes) { IoHashStream Hasher; @@ -46,11 +46,21 @@ IoHash::HashBuffer(const CompositeBuffer& Buffer) FileRef.FileChunkOffset, FileRef.FileChunkSize, BufferingSize, - [&Hasher](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + [&Hasher, ProcessedBytes](const void* Data, size_t Size) { + Hasher.Append(Data, Size); + if (ProcessedBytes != nullptr) + { + ProcessedBytes->fetch_add(Size); + } + }); } else { Hasher.Append(Segment.GetData(), SegmentSize); + if (ProcessedBytes != nullptr) + { + ProcessedBytes->fetch_add(SegmentSize); + } } } @@ -58,7 +68,7 @@ IoHash::HashBuffer(const CompositeBuffer& Buffer) } IoHash -IoHash::HashBuffer(const IoBuffer& Buffer) +IoHash::HashBuffer(const IoBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes) { IoHashStream Hasher; @@ -71,11 +81,21 @@ IoHash::HashBuffer(const IoBuffer& Buffer) FileRef.FileChunkOffset, FileRef.FileChunkSize, BufferingSize, - [&Hasher](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + [&Hasher, ProcessedBytes](const void* Data, size_t Size) { + Hasher.Append(Data, Size); + if (ProcessedBytes != nullptr) + { + ProcessedBytes->fetch_add(Size); + } + }); } else { Hasher.Append(Buffer.GetData(), BufferSize); + if (ProcessedBytes != nullptr) + { + ProcessedBytes->fetch_add(BufferSize); + } } return Hasher.GetHash(); diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 211a5d05c..bb15a6fce 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -449,12 +449,27 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile } static cpr::Response -DoWithRetry(std::string_view SessionId, std::function<cpr::Response()>&& Func, uint8_t RetryCount) +DoWithRetry( + std::string_view SessionId, + std::function<cpr::Response()>&& Func, + uint8_t RetryCount, + std::function<bool(cpr::Response& Result)>&& Validate = [](cpr::Response&) { return true; }) { uint8_t Attempt = 0; cpr::Response Result = Func(); - while (Attempt < RetryCount && ShouldRetry(Result)) + while (Attempt < RetryCount) { + if (!ShouldRetry(Result)) + { + if (Result.error || !IsHttpSuccessCode(Result.status_code)) + { + break; + } + if (Validate(Result)) + { + break; + } + } Sleep(100 * (Attempt + 1)); Attempt++; ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); @@ -881,7 +896,11 @@ HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); return Sess.Get(); }, - m_ConnectionSettings.RetryCount)); + m_ConnectionSettings.RetryCount, + [](cpr::Response& Result) { + std::unique_ptr<detail::TempPayloadFile> NoTempFile; + return ValidatePayload(Result, NoTempFile); + })); } HttpClient::Response @@ -1247,7 +1266,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold { uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - std::string Range = fmt::format("bytes={}-{}", DownloadedSize, ContentLength.value()); + std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength.value() - 1); if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) { if (RangeIt->second == Range) diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp index b0212ab07..5d96de225 100644 --- a/src/zenserver/objectstore/objectstore.cpp +++ b/src/zenserver/objectstore/objectstore.cpp @@ -376,7 +376,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s Writer.BeginArray("Contents"sv); } - void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override + void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override { const fs::path FullPath = Parent / fs::path(File); fs::path RelativePath = fs::relative(FullPath, BucketPath); diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index 412769174..e4e91104c 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -123,18 +123,17 @@ public: JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult PutResult = - Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload); + Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); AddStats(PutResult); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, Result.Reason); return Result; @@ -147,18 +146,16 @@ public: IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer(); MetaPayload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutMetaResult = - Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload); + JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload); AddStats(PutMetaResult); RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); if (MetaDataResult.ErrorCode) { - ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'", + ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, MetaDataResult.Reason); } @@ -311,30 +308,28 @@ public: { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId); AddStats(FindResult); GetKnownBlocksResult Result{ConvertResult(FindResult)}; if (Result.ErrorCode) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, Result.Reason); return Result; } if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv, + Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, - m_BuildId, - m_OplogBuildPartId); + m_BuildId); return Result; } std::optional<std::vector<ChunkBlockDescription>> Blocks = @@ -342,12 +337,11 @@ public: if (!Blocks) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a list of blocks"sv, + Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, - m_BuildId, - m_OplogBuildPartId); + m_BuildId); return Result; } Result.Blocks = std::move(Blocks.value()); @@ -358,18 +352,17 @@ public: { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath); + JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath); AddStats(GetResult); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, Result.Reason); } diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 0285cc22f..b4b2c6fc4 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1466,10 +1466,7 @@ BuildContainer(CidStore& ChunkStore, }; std::vector<ChunkedFile> ChunkedFiles; - auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash, - IoBuffer& RawData, - const IoBufferFileReference& FileRef, - JobContext*) -> ChunkedFile { + auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile { ChunkedFile Chunked; Stopwatch Timer; @@ -1578,22 +1575,7 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - uint32_t RetriesLeft = 3; - - IoBuffer TempAttachmentBuffer = - WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath, [&](std::error_code& Ec) { - if (RetriesLeft == 0) - { - return false; - } - ZEN_WARN("Failed to create temporary attachment '{}': '{}', retries left: {}.", - AttachmentPath, - Ec.message(), - RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - RetriesLeft--; - return true; - }); + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), @@ -1612,34 +1594,21 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - uint32_t RetriesLeft = 3; - IoBuffer TempAttachmentBuffer = - WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath, [&](std::error_code& Ec) { - if (RetriesLeft == 0) - { - return false; - } - ZEN_WARN("Failed to create temporary attachment '{}': '{}', retries left: {}.", - AttachmentPath, - Ec.message(), - RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - RetriesLeft--; - return true; - }); + uint64_t CompressedSize = Compressed.GetCompressedSize(); + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); - if (Compressed.GetCompressedSize() > MaxChunkEmbedSize) + if (CompressedSize > MaxChunkEmbedSize) { OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } else { - UploadAttachment->Size = Compressed.GetCompressedSize(); + UploadAttachment->Size = CompressedSize; ResolveLock.WithExclusiveLock( [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); @@ -2362,17 +2331,7 @@ SaveOplog(CidStore& ChunkStore, BlockPath.append(Block.BlockHash.ToHexString()); try { - uint32_t RetriesLeft = 3; - IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath, [&](std::error_code& Ec) { - if (RetriesLeft == 0) - { - return false; - } - ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - RetriesLeft--; - return true; - }); + IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 14123528c..34db51aa9 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -185,7 +185,7 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN // in this folder as well struct Visitor : public FileSystemTraversal::TreeVisitor { - virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t, uint32_t) override + virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t, uint32_t, uint64_t) override { // We don't care about files } @@ -1174,7 +1174,7 @@ FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) struct Visitor : public FileSystemTraversal::TreeVisitor { Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {} - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override { std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); diff --git a/src/zenutil/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp index c08492eb0..3f3a6661c 100644 --- a/src/zenutil/chunkedfile.cpp +++ b/src/zenutil/chunkedfile.cpp @@ -121,7 +121,7 @@ ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Para Chunker.SetUseThreshold(Params.UseThreshold); Chunker.SetChunkSize(Params.MinSize, Params.MaxSize, Params.AvgSize); size_t End = Offset + Size; - const size_t ScanBufferSize = 1u * 1024 * 1024; // (Params.MaxSize * 9) / 3;//1 * 1024 * 1024; + const size_t ScanBufferSize = Max(1u * 1024 * 1024, Params.MaxSize); BasicFileBuffer RawBuffer(RawData, ScanBufferSize); MemoryView SliceView = RawBuffer.MakeView(Min(End - Offset, ScanBufferSize), Offset); ZEN_ASSERT(!SliceView.IsEmpty()); diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index 075c35b40..852271868 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -116,21 +116,18 @@ public: JupiterResult PutBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, ZenContentType ContentType, const CompositeBuffer& Payload); JupiterResult GetBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, std::filesystem::path TempFolderPath); JupiterResult PutMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, ZenContentType ContentType, uint64_t PayloadSize, @@ -139,7 +136,6 @@ public: JupiterResult GetMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, uint64_t ChunkSize, std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, @@ -147,7 +143,6 @@ public: JupiterResult PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, const IoBuffer& Payload); FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, @@ -155,12 +150,8 @@ public: const Oid& BuildId, const Oid& PartId, const IoHash& RawHash); - JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); - JupiterResult GetBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - IoBuffer Payload); + JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload); private: inline LoggerRef Log() { return m_Log; } diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index d56927b44..06ac6ae36 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -436,15 +436,14 @@ JupiterResult JupiterSession::PutBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, ZenContentType ContentType, const CompositeBuffer& Payload) { - HttpClient::Response Response = m_HttpClient.Upload( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload, - ContentType); + HttpClient::Response Response = + m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), + Payload, + ContentType); return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); } @@ -452,7 +451,6 @@ JupiterResult JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, ZenContentType ContentType, uint64_t PayloadSize, @@ -498,12 +496,8 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize); CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save(); - std::string StartMultipartResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/startMultipartUpload", - Namespace, - BucketId, - BuildId, - PartId, - Hash.ToHexString()); + std::string StartMultipartResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString()); // ZEN_INFO("POST: {}", StartMultipartResponseRequestString); HttpClient::Response StartMultipartResponse = m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject)); @@ -529,15 +523,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++) { - OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, PartId, Hash, ContentType, Workload, PartIndex]( + OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex]( bool& OutIsComplete) -> JupiterResult { const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); - std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}", + std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", Namespace, BucketId, BuildId, - PartId, Hash.ToHexString(), Part.QueryString); // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); @@ -571,12 +564,7 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, CbObject CompletePayload = CompletePayloadWriter.Save(); std::string MultipartEndResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/completeMultipart", - Namespace, - BucketId, - BuildId, - PartId, - Hash.ToHexString()); + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString()); MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString, CompletePayload, @@ -600,13 +588,12 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; IoBuffer RetryPartPayload = - Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte); + Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); std::string RetryMultipartUploadResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}", + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", Namespace, BucketId, BuildId, - RetryPartId, Hash.ToHexString(), RetryPart.QueryString); @@ -642,14 +629,12 @@ JupiterResult JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, uint64_t ChunkSize, std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, std::vector<std::function<JupiterResult()>>& OutWorkItems) { - std::string RequestUrl = - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()); + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); HttpClient::Response Response = m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); if (Response.IsSuccess()) @@ -683,17 +668,12 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa { uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); OutWorkItems.emplace_back( - [this, Namespace, BucketId, BuildId, PartId, Hash, TotalSize, Workload, Offset, PartSize]() - -> JupiterResult { - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", - Namespace, - BucketId, - BuildId, - PartId, - Hash.ToHexString()); - HttpClient::Response Response = m_HttpClient.Get( - RequestUrl, - HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); + [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult { + std::string RequestUrl = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); + HttpClient::Response Response = m_HttpClient.Get( + RequestUrl, + HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); if (Response.IsSuccess()) { uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); @@ -717,13 +697,12 @@ JupiterResult JupiterSession::GetBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, std::filesystem::path TempFolderPath) { - HttpClient::Response Response = m_HttpClient.Download( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - TempFolderPath); + HttpClient::Response Response = + m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), + TempFolderPath); return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } @@ -732,14 +711,13 @@ JupiterResult JupiterSession::PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Put( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload); + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()), + Payload); return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); } @@ -772,24 +750,19 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace, } JupiterResult -JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) { - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), - HttpClient::Accept(ZenContentType::kCbObject)); + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId), + HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); } JupiterResult -JupiterSession::GetBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - IoBuffer Payload) +JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId, PartId), + m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId), Payload, HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp index 6314c407f..0444fa2c4 100644 --- a/src/zenutil/logging.cpp +++ b/src/zenutil/logging.cpp @@ -10,6 +10,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <spdlog/spdlog.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <zencore/callstack.h> #include <zencore/compactbinary.h> #include <zencore/filesystem.h> #include <zencore/logging.h> @@ -97,7 +98,13 @@ BeginInitializeLogging(const LoggingOptions& LogOptions) } } - std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); }); + std::set_terminate([]() { + void* Frames[8]; + uint32_t FrameCount = GetCallstack(2, 8, Frames); + CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); + ZEN_CRITICAL("Program exited abnormally via std::terminate()\n{}", CallstackToString(Callstack, " ")); + FreeCallstack(Callstack); + }); // Default |