aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-02-25 15:48:43 +0100
committerGitHub Enterprise <[email protected]>2025-02-25 15:48:43 +0100
commit5bc5b0dd59c0f02afe553e5074dfe57951b19044 (patch)
tree625d46a9ef656cd6dd5f2879182f686b0299f44b /src
parent5.5.18 (diff)
downloadzen-5bc5b0dd59c0f02afe553e5074dfe57951b19044.tar.xz
zen-5bc5b0dd59c0f02afe553e5074dfe57951b19044.zip
improvements and infrastructure for upcoming builds api command line (#284)
* add modification tick to filesystem traversal * add ShowDetails option to ProgressBar * log callstack if we terminate process * handle chunking if MaxSize > 1MB * BasicFile write helpers and WriteToTempFile simplifications * bugfix for CompositeBuffer::IterateRange when using DecompressToComposite for actually comrpessed data revert of earlier optimization * faster compress/decompress for large disk-based files * enable progress feedback in IoHash::HashBuffer * add payload validation in HttpClient::Get * fix range requests (range is including end byte) * remove BuildPartId for blob/block related operations in builds api
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/copy_cmd.cpp6
-rw-r--r--src/zen/cmds/serve_cmd.cpp2
-rw-r--r--src/zen/zen.cpp20
-rw-r--r--src/zen/zen.h3
-rw-r--r--src/zencore/basicfile.cpp79
-rw-r--r--src/zencore/compositebuffer.cpp34
-rw-r--r--src/zencore/compress.cpp214
-rw-r--r--src/zencore/filesystem.cpp26
-rw-r--r--src/zencore/include/zencore/basicfile.h6
-rw-r--r--src/zencore/include/zencore/filesystem.h20
-rw-r--r--src/zencore/include/zencore/iohash.h4
-rw-r--r--src/zencore/iohash.cpp28
-rw-r--r--src/zenhttp/httpclient.cpp27
-rw-r--r--src/zenserver/objectstore/objectstore.cpp2
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.cpp31
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp55
-rw-r--r--src/zenstore/filecas.cpp4
-rw-r--r--src/zenutil/chunkedfile.cpp2
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h13
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp85
-rw-r--r--src/zenutil/logging.cpp9
21 files changed, 399 insertions, 271 deletions
diff --git a/src/zen/cmds/copy_cmd.cpp b/src/zen/cmds/copy_cmd.cpp
index d42d3c107..cc6ddd505 100644
--- a/src/zen/cmds/copy_cmd.cpp
+++ b/src/zen/cmds/copy_cmd.cpp
@@ -120,7 +120,11 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
}
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override
+ virtual void VisitFile(const std::filesystem::path& Parent,
+ const path_view& File,
+ uint64_t FileSize,
+ uint32_t,
+ uint64_t) override
{
ZEN_UNUSED(FileSize);
std::error_code Ec;
diff --git a/src/zen/cmds/serve_cmd.cpp b/src/zen/cmds/serve_cmd.cpp
index 8e36e74ce..f87725e36 100644
--- a/src/zen/cmds/serve_cmd.cpp
+++ b/src/zen/cmds/serve_cmd.cpp
@@ -120,7 +120,7 @@ ServeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
struct FsVisitor : public FileSystemTraversal::TreeVisitor
{
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override
{
std::filesystem::path ServerPath = std::filesystem::relative(Parent / File, RootPath);
std::string ServerPathString = reinterpret_cast<const char*>(ServerPath.generic_u8string().c_str());
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index fd58b024a..872ea8941 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -24,6 +24,7 @@
#include "cmds/vfs_cmd.h"
#include "cmds/workspaces_cmd.h"
+#include <zencore/callstack.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -251,7 +252,10 @@ ZenCmdBase::ResolveTargetHostSpec(const std::string& InHostSpec)
return ResolveTargetHostSpec(InHostSpec, /* out */ Dummy);
}
-ProgressBar::ProgressBar(bool PlainProgress) : m_PlainProgress(PlainProgress), m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000)
+ProgressBar::ProgressBar(bool PlainProgress, bool ShowDetails)
+: m_PlainProgress(PlainProgress)
+, m_ShowDetails(ShowDetails)
+, m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000)
{
}
@@ -270,6 +274,7 @@ ProgressBar::~ProgressBar()
void
ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
{
+ ZEN_ASSERT(NewState.TotalCount >= NewState.RemainingCount);
if (DoLinebreak == false && m_State == NewState)
{
return;
@@ -288,7 +293,8 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
if (m_PlainProgress)
{
- ZEN_CONSOLE("{} {}% ({})", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS));
+ std::string Details = (m_ShowDetails && !NewState.Details.empty()) ? fmt::format(": {}", NewState.Details) : "";
+ ZEN_CONSOLE("{} {}% ({}){}", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details);
}
else
{
@@ -327,7 +333,7 @@ ProgressBar::ForceLinebreak()
void
ProgressBar::Finish()
{
- if (m_LastOutputLength > 0 && m_State.RemainingCount > 0)
+ if (m_LastOutputLength > 0)
{
State NewState = m_State;
NewState.RemainingCount = 0;
@@ -367,7 +373,13 @@ main(int argc, char** argv)
// Set output mode to handle virtual terminal sequences
zen::logging::EnableVTMode();
- std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); });
+ std::set_terminate([]() {
+ void* Frames[8];
+ uint32_t FrameCount = GetCallstack(2, 8, Frames);
+ CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
+ ZEN_CRITICAL("Program exited abnormally via std::terminate()\n{}", CallstackToString(Callstack, " "));
+ FreeCallstack(Callstack);
+ });
LoggerRef DefaultLogger = zen::logging::Default();
auto& Sinks = DefaultLogger.SpdLogger->sinks();
diff --git a/src/zen/zen.h b/src/zen/zen.h
index 9c9586050..835c2b6ac 100644
--- a/src/zen/zen.h
+++ b/src/zen/zen.h
@@ -84,7 +84,7 @@ public:
uint64_t RemainingCount = 0;
};
- explicit ProgressBar(bool PlainProgress);
+ explicit ProgressBar(bool PlainProgress, bool ShowDetails = true);
~ProgressBar();
void UpdateState(const State& NewState, bool DoLinebreak);
@@ -95,6 +95,7 @@ public:
private:
const bool m_PlainProgress;
+ const bool m_ShowDetails;
Stopwatch m_SW;
uint64_t m_LastUpdateMS;
State m_State;
diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp
index b3bffd34d..6e879ca0d 100644
--- a/src/zencore/basicfile.cpp
+++ b/src/zencore/basicfile.cpp
@@ -281,6 +281,20 @@ BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<voi
}
uint64_t
+BasicFile::Write(const CompositeBuffer& Data, uint64_t FileOffset)
+{
+ std::error_code Ec;
+ uint64_t WrittenBytes = Write(Data, FileOffset, Ec);
+
+ if (Ec)
+ {
+ std::error_code Dummy;
+ throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy)));
+ }
+ return WrittenBytes;
+}
+
+uint64_t
BasicFile::Write(const CompositeBuffer& Data, uint64_t FileOffset, std::error_code& Ec)
{
uint64_t WrittenBytes = 0;
@@ -309,6 +323,8 @@ BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec)
void
BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec)
{
+ ZEN_ASSERT(m_FileHandle != nullptr);
+
Ec.clear();
const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
@@ -817,6 +833,17 @@ BasicFileWriter::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
}
void
+BasicFileWriter::Write(const CompositeBuffer& Data, uint64_t FileOffset)
+{
+ for (const SharedBuffer& Segment : Data.GetSegments())
+ {
+ const uint64_t SegmentSize = Segment.GetSize();
+ Write(Segment.GetData(), SegmentSize, FileOffset);
+ FileOffset += SegmentSize;
+ }
+}
+
+void
BasicFileWriter::Flush()
{
const uint64_t BufferedBytes = m_BufferEnd - m_BufferStart;
@@ -831,23 +858,20 @@ BasicFileWriter::Flush()
}
IoBuffer
-WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path, std::function<bool(std::error_code& Ec)>&& RetryCallback)
+WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path)
{
- if (std::filesystem::is_regular_file(Path))
+ TemporaryFile Temp;
+ std::error_code Ec;
+ Temp.CreateTemporary(Path.parent_path(), Ec);
+ if (Ec)
{
- IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path));
- if (ExistingTempFile && ExistingTempFile.GetSize() == Buffer.GetSize())
- {
- ExistingTempFile.SetDeleteOnClose(true);
- return ExistingTempFile;
- }
+ throw std::system_error(Ec, fmt::format("Failed to create temp file for blob at '{}'", Path));
}
- BasicFile BlockFile;
- BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, std::move(RetryCallback));
- uint64_t Offset = 0;
+
{
+ uint64_t Offset = 0;
static const uint64_t BufferingSize = 256u * 1024u;
- BasicFileWriter BufferedOutput(BlockFile, BufferingSize / 2);
+ // BasicFileWriter BufferedOutput(BlockFile, BufferingSize / 2);
for (const SharedBuffer& Segment : Buffer.GetSegments())
{
size_t SegmentSize = Segment.GetSize();
@@ -859,22 +883,39 @@ WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path, std
FileRef.FileChunkOffset,
FileRef.FileChunkSize,
BufferingSize,
- [&BufferedOutput, &Offset](const void* Data, size_t Size) {
- BufferedOutput.Write(Data, Size, Offset);
+ [&Temp, &Offset](const void* Data, size_t Size) {
+ Temp.Write(Data, Size, Offset);
Offset += Size;
});
}
else
{
- BufferedOutput.Write(Segment.GetData(), SegmentSize, Offset);
+ Temp.Write(Segment.GetData(), SegmentSize, Offset);
Offset += SegmentSize;
}
}
}
- void* FileHandle = BlockFile.Detach();
- IoBuffer BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
- BlockBuffer.SetDeleteOnClose(true);
- return BlockBuffer;
+
+ Temp.MoveTemporaryIntoPlace(Path, Ec);
+ if (Ec)
+ {
+ IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path);
+ if (TmpBuffer)
+ {
+ IoHash ExistingHash = IoHash::HashBuffer(TmpBuffer);
+ const IoHash ExpectedHash = IoHash::HashBuffer(Buffer);
+ if (ExistingHash == ExpectedHash)
+ {
+ TmpBuffer.SetDeleteOnClose(true);
+ return TmpBuffer;
+ }
+ }
+ throw std::system_error(Ec, fmt::format("Failed to move temp file to '{}'", Path));
+ }
+
+ IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path);
+ TmpBuffer.SetDeleteOnClose(true);
+ return TmpBuffer;
}
//////////////////////////////////////////////////////////////////////////
diff --git a/src/zencore/compositebuffer.cpp b/src/zencore/compositebuffer.cpp
index 49870a304..252ac9045 100644
--- a/src/zencore/compositebuffer.cpp
+++ b/src/zencore/compositebuffer.cpp
@@ -275,36 +275,18 @@ CompositeBuffer::IterateRange(uint64_t Offset,
Visitor(View, Segment);
break;
}
- if (Offset < SegmentSize)
+ else if (Offset <= SegmentSize)
{
- if (Offset == 0 && Size >= SegmentSize)
+ const MemoryView View = Segment.GetView().Mid(Offset, Size);
+ Offset = 0;
+ if (Size == 0 || !View.IsEmpty())
{
- const MemoryView View = Segment.GetView();
- if (!View.IsEmpty())
- {
- Visitor(View, Segment);
- }
- Size -= View.GetSize();
- if (Size == 0)
- {
- break;
- }
+ Visitor(View, Segment);
}
- else
+ Size -= View.GetSize();
+ if (Size == 0)
{
- // If we only want a section of the segment, do a subrange so we don't have to materialize the entire iobuffer
- IoBuffer SubRange(Segment.AsIoBuffer(), Offset, Min(Size, SegmentSize - Offset));
- const MemoryView View = SubRange.GetView();
- if (!View.IsEmpty())
- {
- Visitor(View, Segment);
- }
- Size -= View.GetSize();
- if (Size == 0)
- {
- break;
- }
- Offset = 0;
+ break;
}
}
else
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 29c1d9256..0e2ce2b54 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -2,6 +2,7 @@
#include <zencore/compress.h>
+#include <zencore/basicfile.h>
#include <zencore/blake3.h>
#include <zencore/compositebuffer.h>
#include <zencore/crc32.h>
@@ -314,37 +315,77 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize)
CompressedBlockSizes.reserve(BlockCount);
uint64_t CompressedSize = 0;
{
- UniqueBuffer RawBlockCopy;
MutableMemoryView CompressedBlocksView = CompressedData.GetMutableView() + sizeof(BufferHeader) + MetaSize;
- CompositeBuffer::Iterator It = RawData.GetIterator(0);
-
- for (uint64_t RawOffset = 0; RawOffset < RawSize;)
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
{
- const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
- const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
- RawHash.Append(RawBlock);
-
- MutableMemoryView CompressedBlock = CompressedBlocksView;
- if (!CompressBlock(CompressedBlock, RawBlock))
+ ZEN_ASSERT(FileRef.FileHandle != nullptr);
+ UniqueBuffer RawBlockCopy = UniqueBuffer::Alloc(BlockSize);
+ BasicFile Source;
+ Source.Attach(FileRef.FileHandle);
+ for (uint64_t RawOffset = 0; RawOffset < RawSize;)
{
- return CompositeBuffer();
- }
+ const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
+ Source.Read(RawBlockCopy.GetData(), RawBlockSize, FileRef.FileChunkOffset + RawOffset);
+ const MemoryView RawBlock = RawBlockCopy.GetView().Left(RawBlockSize);
+ RawHash.Append(RawBlock);
+ MutableMemoryView CompressedBlock = CompressedBlocksView;
+ if (!CompressBlock(CompressedBlock, RawBlock))
+ {
+ Source.Detach();
+ return CompositeBuffer();
+ }
- uint64_t CompressedBlockSize = CompressedBlock.GetSize();
- if (RawBlockSize <= CompressedBlockSize)
- {
- CompressedBlockSize = RawBlockSize;
- CompressedBlocksView = CompressedBlocksView.CopyFrom(RawBlock);
+ uint64_t CompressedBlockSize = CompressedBlock.GetSize();
+ if (RawBlockSize <= CompressedBlockSize)
+ {
+ CompressedBlockSize = RawBlockSize;
+ CompressedBlocksView = CompressedBlocksView.CopyFrom(RawBlock);
+ }
+ else
+ {
+ CompressedBlocksView += CompressedBlockSize;
+ }
+
+ CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize));
+ CompressedSize += CompressedBlockSize;
+ RawOffset += RawBlockSize;
}
- else
+ Source.Detach();
+ }
+ else
+ {
+ UniqueBuffer RawBlockCopy;
+ CompositeBuffer::Iterator It = RawData.GetIterator(0);
+
+ for (uint64_t RawOffset = 0; RawOffset < RawSize;)
{
- CompressedBlocksView += CompressedBlockSize;
- }
+ const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
+ const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
+ RawHash.Append(RawBlock);
- CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize));
- CompressedSize += CompressedBlockSize;
- RawOffset += RawBlockSize;
+ MutableMemoryView CompressedBlock = CompressedBlocksView;
+ if (!CompressBlock(CompressedBlock, RawBlock))
+ {
+ return CompositeBuffer();
+ }
+
+ uint64_t CompressedBlockSize = CompressedBlock.GetSize();
+ if (RawBlockSize <= CompressedBlockSize)
+ {
+ CompressedBlockSize = RawBlockSize;
+ CompressedBlocksView = CompressedBlocksView.CopyFrom(RawBlock);
+ }
+ else
+ {
+ CompressedBlocksView += CompressedBlockSize;
+ }
+
+ CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize));
+ CompressedSize += CompressedBlockSize;
+ RawOffset += RawBlockSize;
+ }
}
}
@@ -560,51 +601,118 @@ BlockDecoder::TryDecompressTo(const BufferHeader& Header,
CompressedOffset += CompressedBlockSize;
}
- for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++)
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if ((CompressedData.GetSegments().size() == 1) && CompressedData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
{
- const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize;
- const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]);
- const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize;
+ ZEN_ASSERT(FileRef.FileHandle != nullptr);
+ BasicFile Source;
+ Source.Attach(FileRef.FileHandle);
- const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawView.GetSize(), UncompressedBlockSize - OffsetInFirstBlock)
- : zen::Min(RemainingRawSize, BlockSize);
+ for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++)
+ {
+ const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize;
+ const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]);
+ const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize;
- MemoryView CompressedBlock = CompressedData.ViewOrCopyRange(CompressedOffset, CompressedBlockSize, CompressedBlockCopy);
+ const uint64_t BytesToUncompress = OffsetInFirstBlock > 0
+ ? zen::Min(RawView.GetSize(), UncompressedBlockSize - OffsetInFirstBlock)
+ : zen::Min(RemainingRawSize, BlockSize);
- if (IsCompressed)
- {
- MutableMemoryView UncompressedBlock = RawView.Left(BytesToUncompress);
+ if (CompressedBlockCopy.GetSize() < CompressedBlockSize)
+ {
+ CompressedBlockCopy = UniqueBuffer::Alloc(CompressedBlockSize);
+ }
+ Source.Read(CompressedBlockCopy.GetData(), CompressedBlockSize, FileRef.FileChunkOffset + CompressedOffset);
- const bool IsAligned = BytesToUncompress == UncompressedBlockSize;
- if (!IsAligned)
+ MemoryView CompressedBlock = CompressedBlockCopy.GetView().Left(CompressedBlockSize);
+
+ if (IsCompressed)
{
- // Decompress to a temporary buffer when the first or the last block reads are not aligned with the block boundaries.
- if (UncompressedBlockCopy.IsNull())
+ MutableMemoryView UncompressedBlock = RawView.Left(BytesToUncompress);
+
+ const bool IsAligned = BytesToUncompress == UncompressedBlockSize;
+ if (!IsAligned)
{
- UncompressedBlockCopy = UniqueBuffer::Alloc(BlockSize);
+ // Decompress to a temporary buffer when the first or the last block reads are not aligned with the block boundaries.
+ if (UncompressedBlockCopy.IsNull())
+ {
+ UncompressedBlockCopy = UniqueBuffer::Alloc(BlockSize);
+ }
+ UncompressedBlock = UncompressedBlockCopy.GetMutableView().Mid(0, UncompressedBlockSize);
}
- UncompressedBlock = UncompressedBlockCopy.GetMutableView().Mid(0, UncompressedBlockSize);
- }
- if (!DecompressBlock(UncompressedBlock, CompressedBlock))
- {
- return false;
- }
+ if (!DecompressBlock(UncompressedBlock, CompressedBlock))
+ {
+ Source.Detach();
+ return false;
+ }
- if (!IsAligned)
+ if (!IsAligned)
+ {
+ RawView.CopyFrom(UncompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress));
+ }
+ }
+ else
{
- RawView.CopyFrom(UncompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress));
+ RawView.CopyFrom(CompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress));
}
+
+ OffsetInFirstBlock = 0;
+ RemainingRawSize -= BytesToUncompress;
+ CompressedOffset += CompressedBlockSize;
+ RawView += BytesToUncompress;
}
- else
+ Source.Detach();
+ }
+ else
+ {
+ for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++)
{
- RawView.CopyFrom(CompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress));
- }
+ const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize;
+ const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]);
+ const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize;
- OffsetInFirstBlock = 0;
- RemainingRawSize -= BytesToUncompress;
- CompressedOffset += CompressedBlockSize;
- RawView += BytesToUncompress;
+ const uint64_t BytesToUncompress = OffsetInFirstBlock > 0
+ ? zen::Min(RawView.GetSize(), UncompressedBlockSize - OffsetInFirstBlock)
+ : zen::Min(RemainingRawSize, BlockSize);
+
+ MemoryView CompressedBlock = CompressedData.ViewOrCopyRange(CompressedOffset, CompressedBlockSize, CompressedBlockCopy);
+
+ if (IsCompressed)
+ {
+ MutableMemoryView UncompressedBlock = RawView.Left(BytesToUncompress);
+
+ const bool IsAligned = BytesToUncompress == UncompressedBlockSize;
+ if (!IsAligned)
+ {
+ // Decompress to a temporary buffer when the first or the last block reads are not aligned with the block boundaries.
+ if (UncompressedBlockCopy.IsNull())
+ {
+ UncompressedBlockCopy = UniqueBuffer::Alloc(BlockSize);
+ }
+ UncompressedBlock = UncompressedBlockCopy.GetMutableView().Mid(0, UncompressedBlockSize);
+ }
+
+ if (!DecompressBlock(UncompressedBlock, CompressedBlock))
+ {
+ return false;
+ }
+
+ if (!IsAligned)
+ {
+ RawView.CopyFrom(UncompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress));
+ }
+ }
+ else
+ {
+ RawView.CopyFrom(CompressedBlock.Mid(OffsetInFirstBlock, BytesToUncompress));
+ }
+
+ OffsetInFirstBlock = 0;
+ RemainingRawSize -= BytesToUncompress;
+ CompressedOffset += CompressedBlockSize;
+ RawView += BytesToUncompress;
+ }
}
return RemainingRawSize == 0;
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index b8c35212f..5716d1255 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -683,7 +683,7 @@ CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop
{
}
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override
{
std::error_code Ec;
const std::filesystem::path Relative = std::filesystem::relative(Parent, BasePath, Ec);
@@ -1236,7 +1236,11 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr
}
else
{
- Visitor.VisitFile(RootDir, FileName, DirInfo->EndOfFile.QuadPart, gsl::narrow<uint32_t>(DirInfo->FileAttributes));
+ Visitor.VisitFile(RootDir,
+ FileName,
+ DirInfo->EndOfFile.QuadPart,
+ gsl::narrow<uint32_t>(DirInfo->FileAttributes),
+ (uint64_t)DirInfo->LastWriteTime.QuadPart);
}
const uint64_t NextOffset = DirInfo->NextEntryOffset;
@@ -1285,7 +1289,7 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr
}
else if (S_ISREG(Stat.st_mode))
{
- Visitor.VisitFile(RootDir, FileName, Stat.st_size, gsl::narrow<uint32_t>(Stat.st_mode));
+ Visitor.VisitFile(RootDir, FileName, Stat.st_size, gsl::narrow<uint32_t>(Stat.st_mode), gsl::narrow<uint64_t>(Stat.st_mtime));
}
else
{
@@ -1544,7 +1548,8 @@ GetDirectoryContent(const std::filesystem::path& RootDir, DirectoryContentFlags
virtual void VisitFile(const std::filesystem::path& Parent,
const path_view& File,
uint64_t FileSize,
- uint32_t NativeModeOrAttributes) override
+ uint32_t NativeModeOrAttributes,
+ uint64_t NativeModificationTick) override
{
if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles))
{
@@ -1557,6 +1562,10 @@ GetDirectoryContent(const std::filesystem::path& RootDir, DirectoryContentFlags
{
Content.FileAttributes.push_back(NativeModeOrAttributes);
}
+ if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeModificationTick))
+ {
+ Content.FileModificationTicks.push_back(NativeModificationTick);
+ }
}
}
@@ -1612,7 +1621,8 @@ GetDirectoryContent(const std::filesystem::path& RootDir,
virtual void VisitFile(const std::filesystem::path&,
const path_view& File,
uint64_t FileSize,
- uint32_t NativeModeOrAttributes) override
+ uint32_t NativeModeOrAttributes,
+ uint64_t NativeModificationTick) override
{
if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles))
{
@@ -1625,6 +1635,10 @@ GetDirectoryContent(const std::filesystem::path& RootDir,
{
Content.FileAttributes.push_back(NativeModeOrAttributes);
}
+ if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeModificationTick))
+ {
+ Content.FileModificationTicks.push_back(NativeModificationTick);
+ }
}
}
@@ -1928,7 +1942,7 @@ TEST_CASE("filesystem")
// Traversal
struct : public FileSystemTraversal::TreeVisitor
{
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t, uint32_t) override
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t, uint32_t, uint64_t) override
{
bFoundExpected |= std::filesystem::equivalent(Parent / File, Expected);
}
diff --git a/src/zencore/include/zencore/basicfile.h b/src/zencore/include/zencore/basicfile.h
index 7edd40c9c..a78132879 100644
--- a/src/zencore/include/zencore/basicfile.h
+++ b/src/zencore/include/zencore/basicfile.h
@@ -60,6 +60,7 @@ public:
void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
void Write(MemoryView Data, uint64_t FileOffset);
void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec);
+ uint64_t Write(const CompositeBuffer& Data, uint64_t FileOffset);
uint64_t Write(const CompositeBuffer& Data, uint64_t FileOffset, std::error_code& Ec);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec);
@@ -174,6 +175,7 @@ public:
~BasicFileWriter();
void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
+ void Write(const CompositeBuffer& Data, uint64_t FileOffset);
void Flush();
private:
@@ -184,9 +186,7 @@ private:
uint64_t m_BufferEnd;
};
-IoBuffer WriteToTempFile(CompositeBuffer&& Buffer,
- const std::filesystem::path& Path,
- std::function<bool(std::error_code& Ec)>&& RetryCallback);
+IoBuffer WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path);
ZENCORE_API void basicfile_forcelink();
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index ca8682cd7..250745e86 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -203,7 +203,8 @@ public:
virtual void VisitFile(const std::filesystem::path& Parent,
const path_view& File,
uint64_t FileSize,
- uint32_t NativeModeOrAttributes) = 0;
+ uint32_t NativeModeOrAttributes,
+ uint64_t NativeModificationTick) = 0;
// This should return true if we should recurse into the directory
virtual bool VisitDirectory(const std::filesystem::path& Parent,
@@ -216,13 +217,14 @@ public:
enum class DirectoryContentFlags : uint8_t
{
- None = 0,
- IncludeDirs = 1u << 0,
- IncludeFiles = 1u << 1,
- Recursive = 1u << 2,
- IncludeFileSizes = 1u << 3,
- IncludeAttributes = 1u << 4,
- IncludeAllEntries = IncludeDirs | IncludeFiles | Recursive
+ None = 0,
+ IncludeDirs = 1u << 0,
+ IncludeFiles = 1u << 1,
+ Recursive = 1u << 2,
+ IncludeFileSizes = 1u << 3,
+ IncludeAttributes = 1u << 4,
+ IncludeModificationTick = 1u << 5,
+ IncludeAllEntries = IncludeDirs | IncludeFiles | Recursive
};
ENUM_CLASS_FLAGS(DirectoryContentFlags)
@@ -232,6 +234,7 @@ struct DirectoryContent
std::vector<std::filesystem::path> Files;
std::vector<uint64_t> FileSizes;
std::vector<uint32_t> FileAttributes;
+ std::vector<uint64_t> FileModificationTicks;
std::vector<std::filesystem::path> Directories;
std::vector<uint32_t> DirectoryAttributes;
};
@@ -246,6 +249,7 @@ public:
std::vector<std::filesystem::path> FileNames;
std::vector<uint64_t> FileSizes;
std::vector<uint32_t> FileAttributes;
+ std::vector<uint64_t> FileModificationTicks;
std::vector<std::filesystem::path> DirectoryNames;
std::vector<uint32_t> DirectoryAttributes;
};
diff --git a/src/zencore/include/zencore/iohash.h b/src/zencore/include/zencore/iohash.h
index 8871a5895..7443e17b7 100644
--- a/src/zencore/include/zencore/iohash.h
+++ b/src/zencore/include/zencore/iohash.h
@@ -47,8 +47,8 @@ struct IoHash
static IoHash HashBuffer(const void* data, size_t byteCount);
static IoHash HashBuffer(MemoryView Data) { return HashBuffer(Data.GetData(), Data.GetSize()); }
- static IoHash HashBuffer(const CompositeBuffer& Buffer);
- static IoHash HashBuffer(const IoBuffer& Buffer);
+ static IoHash HashBuffer(const CompositeBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes = nullptr);
+ static IoHash HashBuffer(const IoBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes = nullptr);
static IoHash FromHexString(const char* string);
static IoHash FromHexString(const std::string_view string);
static bool TryParse(std::string_view Str, IoHash& Hash);
diff --git a/src/zencore/iohash.cpp b/src/zencore/iohash.cpp
index 7200e6e3f..3b2af0db4 100644
--- a/src/zencore/iohash.cpp
+++ b/src/zencore/iohash.cpp
@@ -30,7 +30,7 @@ IoHash::HashBuffer(const void* data, size_t byteCount)
}
IoHash
-IoHash::HashBuffer(const CompositeBuffer& Buffer)
+IoHash::HashBuffer(const CompositeBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes)
{
IoHashStream Hasher;
@@ -46,11 +46,21 @@ IoHash::HashBuffer(const CompositeBuffer& Buffer)
FileRef.FileChunkOffset,
FileRef.FileChunkSize,
BufferingSize,
- [&Hasher](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ [&Hasher, ProcessedBytes](const void* Data, size_t Size) {
+ Hasher.Append(Data, Size);
+ if (ProcessedBytes != nullptr)
+ {
+ ProcessedBytes->fetch_add(Size);
+ }
+ });
}
else
{
Hasher.Append(Segment.GetData(), SegmentSize);
+ if (ProcessedBytes != nullptr)
+ {
+ ProcessedBytes->fetch_add(SegmentSize);
+ }
}
}
@@ -58,7 +68,7 @@ IoHash::HashBuffer(const CompositeBuffer& Buffer)
}
IoHash
-IoHash::HashBuffer(const IoBuffer& Buffer)
+IoHash::HashBuffer(const IoBuffer& Buffer, std::atomic<uint64_t>* ProcessedBytes)
{
IoHashStream Hasher;
@@ -71,11 +81,21 @@ IoHash::HashBuffer(const IoBuffer& Buffer)
FileRef.FileChunkOffset,
FileRef.FileChunkSize,
BufferingSize,
- [&Hasher](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ [&Hasher, ProcessedBytes](const void* Data, size_t Size) {
+ Hasher.Append(Data, Size);
+ if (ProcessedBytes != nullptr)
+ {
+ ProcessedBytes->fetch_add(Size);
+ }
+ });
}
else
{
Hasher.Append(Buffer.GetData(), BufferSize);
+ if (ProcessedBytes != nullptr)
+ {
+ ProcessedBytes->fetch_add(BufferSize);
+ }
}
return Hasher.GetHash();
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 211a5d05c..bb15a6fce 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -449,12 +449,27 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile
}
static cpr::Response
-DoWithRetry(std::string_view SessionId, std::function<cpr::Response()>&& Func, uint8_t RetryCount)
+DoWithRetry(
+ std::string_view SessionId,
+ std::function<cpr::Response()>&& Func,
+ uint8_t RetryCount,
+ std::function<bool(cpr::Response& Result)>&& Validate = [](cpr::Response&) { return true; })
{
uint8_t Attempt = 0;
cpr::Response Result = Func();
- while (Attempt < RetryCount && ShouldRetry(Result))
+ while (Attempt < RetryCount)
{
+ if (!ShouldRetry(Result))
+ {
+ if (Result.error || !IsHttpSuccessCode(Result.status_code))
+ {
+ break;
+ }
+ if (Validate(Result))
+ {
+ break;
+ }
+ }
Sleep(100 * (Attempt + 1));
Attempt++;
ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1);
@@ -881,7 +896,11 @@ HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
return Sess.Get();
},
- m_ConnectionSettings.RetryCount));
+ m_ConnectionSettings.RetryCount,
+ [](cpr::Response& Result) {
+ std::unique_ptr<detail::TempPayloadFile> NoTempFile;
+ return ValidatePayload(Result, NoTempFile);
+ }));
}
HttpClient::Response
@@ -1247,7 +1266,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
{
uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- std::string Range = fmt::format("bytes={}-{}", DownloadedSize, ContentLength.value());
+ std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength.value() - 1);
if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
{
if (RangeIt->second == Range)
diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp
index b0212ab07..5d96de225 100644
--- a/src/zenserver/objectstore/objectstore.cpp
+++ b/src/zenserver/objectstore/objectstore.cpp
@@ -376,7 +376,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s
Writer.BeginArray("Contents"sv);
}
- void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override
+ void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override
{
const fs::path FullPath = Parent / fs::path(File);
fs::path RelativePath = fs::relative(FullPath, BucketPath);
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
index 412769174..e4e91104c 100644
--- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
@@ -123,18 +123,17 @@ public:
JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
JupiterResult PutResult =
- Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
AddStats(PutResult);
SaveAttachmentResult Result{ConvertResult(PutResult)};
if (Result.ErrorCode)
{
- Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'",
m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
- m_OplogBuildPartId,
RawHash,
Result.Reason);
return Result;
@@ -147,18 +146,16 @@ public:
IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer();
MetaPayload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutMetaResult =
- Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload);
+ JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload);
AddStats(PutMetaResult);
RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult);
if (MetaDataResult.ErrorCode)
{
- ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'",
+ ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'",
m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
- m_OplogBuildPartId,
RawHash,
MetaDataResult.Reason);
}
@@ -311,30 +308,28 @@ public:
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
- JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
+ JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId);
AddStats(FindResult);
GetKnownBlocksResult Result{ConvertResult(FindResult)};
if (Result.ErrorCode)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'",
m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
- m_OplogBuildPartId,
Result.Reason);
return Result;
}
if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv,
+ Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv,
m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
- m_BuildId,
- m_OplogBuildPartId);
+ m_BuildId);
return Result;
}
std::optional<std::vector<ChunkBlockDescription>> Blocks =
@@ -342,12 +337,11 @@ public:
if (!Blocks)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a list of blocks"sv,
+ Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv,
m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
- m_BuildId,
- m_OplogBuildPartId);
+ m_BuildId);
return Result;
}
Result.Blocks = std::move(Blocks.value());
@@ -358,18 +352,17 @@ public:
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
- JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath);
+ JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath);
AddStats(GetResult);
LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
if (GetResult.ErrorCode)
{
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'",
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'",
m_JupiterClient->ServiceUrl(),
m_Namespace,
m_Bucket,
m_BuildId,
- m_OplogBuildPartId,
RawHash,
Result.Reason);
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 0285cc22f..b4b2c6fc4 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -1466,10 +1466,7 @@ BuildContainer(CidStore& ChunkStore,
};
std::vector<ChunkedFile> ChunkedFiles;
- auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash,
- IoBuffer& RawData,
- const IoBufferFileReference& FileRef,
- JobContext*) -> ChunkedFile {
+ auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile {
ChunkedFile Chunked;
Stopwatch Timer;
@@ -1578,22 +1575,7 @@ BuildContainer(CidStore& ChunkStore,
std::filesystem::path AttachmentPath = AttachmentTempPath;
AttachmentPath.append(RawHash.ToHexString());
- uint32_t RetriesLeft = 3;
-
- IoBuffer TempAttachmentBuffer =
- WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath, [&](std::error_code& Ec) {
- if (RetriesLeft == 0)
- {
- return false;
- }
- ZEN_WARN("Failed to create temporary attachment '{}': '{}', retries left: {}.",
- AttachmentPath,
- Ec.message(),
- RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- RetriesLeft--;
- return true;
- });
+ IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
AttachmentPath,
NiceBytes(RawSize),
@@ -1612,34 +1594,21 @@ BuildContainer(CidStore& ChunkStore,
std::filesystem::path AttachmentPath = AttachmentTempPath;
AttachmentPath.append(RawHash.ToHexString());
- uint32_t RetriesLeft = 3;
- IoBuffer TempAttachmentBuffer =
- WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath, [&](std::error_code& Ec) {
- if (RetriesLeft == 0)
- {
- return false;
- }
- ZEN_WARN("Failed to create temporary attachment '{}': '{}', retries left: {}.",
- AttachmentPath,
- Ec.message(),
- RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- RetriesLeft--;
- return true;
- });
+ uint64_t CompressedSize = Compressed.GetCompressedSize();
+ IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath);
ZEN_INFO("Saved temp attachment to '{}', {} ({})",
AttachmentPath,
NiceBytes(RawSize),
NiceBytes(TempAttachmentBuffer.GetSize()));
- if (Compressed.GetCompressedSize() > MaxChunkEmbedSize)
+ if (CompressedSize > MaxChunkEmbedSize)
{
OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; });
ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); });
}
else
{
- UploadAttachment->Size = Compressed.GetCompressedSize();
+ UploadAttachment->Size = CompressedSize;
ResolveLock.WithExclusiveLock(
[RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
@@ -2362,17 +2331,7 @@ SaveOplog(CidStore& ChunkStore,
BlockPath.append(Block.BlockHash.ToHexString());
try
{
- uint32_t RetriesLeft = 3;
- IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath, [&](std::error_code& Ec) {
- if (RetriesLeft == 0)
- {
- return false;
- }
- ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
- RetriesLeft--;
- return true;
- });
+ IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath);
RwLock::ExclusiveLockScope __(AttachmentsLock);
CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}});
ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize()));
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 14123528c..34db51aa9 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -185,7 +185,7 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
// in this folder as well
struct Visitor : public FileSystemTraversal::TreeVisitor
{
- virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t, uint32_t) override
+ virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t, uint32_t, uint64_t) override
{
// We don't care about files
}
@@ -1174,7 +1174,7 @@ FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir)
struct Visitor : public FileSystemTraversal::TreeVisitor
{
Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {}
- virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override
{
std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory);
diff --git a/src/zenutil/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp
index c08492eb0..3f3a6661c 100644
--- a/src/zenutil/chunkedfile.cpp
+++ b/src/zenutil/chunkedfile.cpp
@@ -121,7 +121,7 @@ ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Para
Chunker.SetUseThreshold(Params.UseThreshold);
Chunker.SetChunkSize(Params.MinSize, Params.MaxSize, Params.AvgSize);
size_t End = Offset + Size;
- const size_t ScanBufferSize = 1u * 1024 * 1024; // (Params.MaxSize * 9) / 3;//1 * 1024 * 1024;
+ const size_t ScanBufferSize = Max(1u * 1024 * 1024, Params.MaxSize);
BasicFileBuffer RawBuffer(RawData, ScanBufferSize);
MemoryView SliceView = RawBuffer.MakeView(Min(End - Offset, ScanBufferSize), Offset);
ZEN_ASSERT(!SliceView.IsEmpty());
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 075c35b40..852271868 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -116,21 +116,18 @@ public:
JupiterResult PutBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
ZenContentType ContentType,
const CompositeBuffer& Payload);
JupiterResult GetBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
std::filesystem::path TempFolderPath);
JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
ZenContentType ContentType,
uint64_t PayloadSize,
@@ -139,7 +136,6 @@ public:
JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
uint64_t ChunkSize,
std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
@@ -147,7 +143,6 @@ public:
JupiterResult PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
const IoBuffer& Payload);
FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
@@ -155,12 +150,8 @@ public:
const Oid& BuildId,
const Oid& PartId,
const IoHash& RawHash);
- JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
- JupiterResult GetBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- IoBuffer Payload);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload);
private:
inline LoggerRef Log() { return m_Log; }
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index d56927b44..06ac6ae36 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -436,15 +436,14 @@ JupiterResult
JupiterSession::PutBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
ZenContentType ContentType,
const CompositeBuffer& Payload)
{
- HttpClient::Response Response = m_HttpClient.Upload(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload,
- ContentType);
+ HttpClient::Response Response =
+ m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
+ Payload,
+ ContentType);
return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv);
}
@@ -452,7 +451,6 @@ JupiterResult
JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
ZenContentType ContentType,
uint64_t PayloadSize,
@@ -498,12 +496,8 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize);
CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save();
- std::string StartMultipartResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/startMultipartUpload",
- Namespace,
- BucketId,
- BuildId,
- PartId,
- Hash.ToHexString());
+ std::string StartMultipartResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString());
// ZEN_INFO("POST: {}", StartMultipartResponseRequestString);
HttpClient::Response StartMultipartResponse =
m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject));
@@ -529,15 +523,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++)
{
- OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, PartId, Hash, ContentType, Workload, PartIndex](
+ OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex](
bool& OutIsComplete) -> JupiterResult {
const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
- std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}",
+ std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}",
Namespace,
BucketId,
BuildId,
- PartId,
Hash.ToHexString(),
Part.QueryString);
// ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
@@ -571,12 +564,7 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
CbObject CompletePayload = CompletePayloadWriter.Save();
std::string MultipartEndResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/completeMultipart",
- Namespace,
- BucketId,
- BuildId,
- PartId,
- Hash.ToHexString());
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString());
MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString,
CompletePayload,
@@ -600,13 +588,12 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
size_t RetryPartIndex = PartNameToIndex.at(RetryPartId);
const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex];
IoBuffer RetryPartPayload =
- Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte);
+ Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1);
std::string RetryMultipartUploadResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}",
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}",
Namespace,
BucketId,
BuildId,
- RetryPartId,
Hash.ToHexString(),
RetryPart.QueryString);
@@ -642,14 +629,12 @@ JupiterResult
JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
uint64_t ChunkSize,
std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
std::vector<std::function<JupiterResult()>>& OutWorkItems)
{
- std::string RequestUrl =
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString());
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
HttpClient::Response Response =
m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
if (Response.IsSuccess())
@@ -683,17 +668,12 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa
{
uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
OutWorkItems.emplace_back(
- [this, Namespace, BucketId, BuildId, PartId, Hash, TotalSize, Workload, Offset, PartSize]()
- -> JupiterResult {
- std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}",
- Namespace,
- BucketId,
- BuildId,
- PartId,
- Hash.ToHexString());
- HttpClient::Response Response = m_HttpClient.Get(
- RequestUrl,
- HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult {
+ std::string RequestUrl =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
if (Response.IsSuccess())
{
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
@@ -717,13 +697,12 @@ JupiterResult
JupiterSession::GetBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
std::filesystem::path TempFolderPath)
{
- HttpClient::Response Response = m_HttpClient.Download(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- TempFolderPath);
+ HttpClient::Response Response =
+ m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
+ TempFolderPath);
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
@@ -732,14 +711,13 @@ JupiterResult
JupiterSession::PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
- const Oid& PartId,
const IoHash& Hash,
const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_HttpClient.Put(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload);
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()),
+ Payload);
return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv);
}
@@ -772,24 +750,19 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace,
}
JupiterResult
-JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
{
- HttpClient::Response Response =
- m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
- HttpClient::Accept(ZenContentType::kCbObject));
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
}
JupiterResult
-JupiterSession::GetBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- IoBuffer Payload)
+JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
HttpClient::Response Response =
- m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId, PartId),
+ m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId),
Payload,
HttpClient::Accept(ZenContentType::kCbObject));
return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp
index 6314c407f..0444fa2c4 100644
--- a/src/zenutil/logging.cpp
+++ b/src/zenutil/logging.cpp
@@ -10,6 +10,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <spdlog/spdlog.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <zencore/callstack.h>
#include <zencore/compactbinary.h>
#include <zencore/filesystem.h>
#include <zencore/logging.h>
@@ -97,7 +98,13 @@ BeginInitializeLogging(const LoggingOptions& LogOptions)
}
}
- std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); });
+ std::set_terminate([]() {
+ void* Frames[8];
+ uint32_t FrameCount = GetCallstack(2, 8, Frames);
+ CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
+ ZEN_CRITICAL("Program exited abnormally via std::terminate()\n{}", CallstackToString(Callstack, " "));
+ FreeCallstack(Callstack);
+ });
// Default