aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-06-11 15:19:16 +0200
committerStefan Boberg <[email protected]>2025-06-11 15:19:16 +0200
commit9a6f0b91802acb5fdfce30952fd7d3f49d55d51e (patch)
tree1eaac7f01c808d65a636c931d2a0fa1df7a1d184 /src
parentFixed accidental path copy (diff)
parentMerge branch 'main' into rpc-analyze (diff)
downloadzen-9a6f0b91802acb5fdfce30952fd7d3f49d55d51e.tar.xz
zen-9a6f0b91802acb5fdfce30952fd7d3f49d55d51e.zip
Merge branch 'rpc-analyze' of https://github.ol.epicgames.net/ue-foundation/zen into rpc-analyze
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp741
-rw-r--r--src/zen/cmds/builds_cmd.h29
-rw-r--r--src/zen/cmds/up_cmd.cpp2
-rw-r--r--src/zen/cmds/wipe_cmd.cpp12
-rw-r--r--src/zen/zen.cpp112
-rw-r--r--src/zen/zen.h21
-rw-r--r--src/zencore/filesystem.cpp230
-rw-r--r--src/zencore/include/zencore/filesystem.h10
-rw-r--r--src/zencore/include/zencore/process.h2
-rw-r--r--src/zencore/include/zencore/sentryintegration.h8
-rw-r--r--src/zencore/process.cpp64
-rw-r--r--src/zencore/sentryintegration.cpp7
-rw-r--r--src/zenhttp/httpclient.cpp2
-rw-r--r--src/zenhttp/httpclientauth.cpp6
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp7
-rw-r--r--src/zenserver/config.cpp2
-rw-r--r--src/zenserver/config.h51
-rw-r--r--src/zenserver/main.cpp6
-rw-r--r--src/zenserver/projectstore/projectstore.cpp11
-rw-r--r--src/zenstore/blockstore.cpp53
-rw-r--r--src/zenstore/buildstore/buildstore.cpp101
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp96
-rw-r--r--src/zenstore/compactcas.cpp309
-rw-r--r--src/zenstore/filecas.cpp12
-rw-r--r--src/zenstore/include/zenstore/blockstore.h6
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h7
-rw-r--r--src/zenutil/chunkedcontent.cpp24
-rw-r--r--src/zenutil/commandlineoptions.cpp20
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h17
-rw-r--r--src/zenutil/include/zenutil/commandlineoptions.h1
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h13
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp7
-rw-r--r--src/zenutil/parallelwork.cpp19
-rw-r--r--src/zenutil/zenserverprocess.cpp2
34 files changed, 1608 insertions, 402 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index fbcb6b900..abb4bfe0c 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -14,6 +14,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/session.h>
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/trace.h>
@@ -37,6 +38,7 @@
#include <signal.h>
#include <memory>
+#include <regex>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
@@ -58,23 +60,205 @@ ZEN_THIRD_PARTY_INCLUDES_END
#define ZEN_CLOUD_STORAGE "Cloud Storage"
namespace zen {
+
+using namespace std::literals;
+
namespace {
+ namespace zenutil {
+#if ZEN_PLATFORM_WINDOWS
+ class SecurityAttributes
+ {
+ public:
+ inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; }
+
+ protected:
+ SECURITY_ATTRIBUTES m_Attributes{};
+ SECURITY_DESCRIPTOR m_Sd{};
+ };
+
+ // Security attributes which allows any user access
+
+ class AnyUserSecurityAttributes : public SecurityAttributes
+ {
+ public:
+ AnyUserSecurityAttributes()
+ {
+ m_Attributes.nLength = sizeof m_Attributes;
+ m_Attributes.bInheritHandle = false; // Disable inheritance
+
+ const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION);
+
+ if (Success)
+ {
+ if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE))
+ {
+ ThrowLastError("SetSecurityDescriptorDacl failed");
+ }
+
+ m_Attributes.lpSecurityDescriptor = &m_Sd;
+ }
+ }
+ };
+#endif // ZEN_PLATFORM_WINDOWS
+
+ } // namespace zenutil
+
static std::atomic<bool> AbortFlag = false;
- static void SignalCallbackHandler(int SigNum)
+ static std::atomic<bool> PauseFlag = false;
+
+ static void SignalCallbackHandler(int SigNum)
{
if (SigNum == SIGINT)
{
+ PauseFlag = false;
AbortFlag = true;
}
#if ZEN_PLATFORM_WINDOWS
if (SigNum == SIGBREAK)
{
+ PauseFlag = false;
AbortFlag = true;
}
#endif // ZEN_PLATFORM_WINDOWS
}
- using namespace std::literals;
+ struct ZenStateSharedData
+ {
+ static constexpr uint64_t kMagicV1 = 0x3176646d636e657a; // zencmdv1
+
+ uint64_t Magic = 0; // Implies the size and layout of this struct - changes to the data requires change to Magic constant
+ std::atomic<uint32_t> Pid;
+ uint8_t SessionId[12];
+ uint8_t Padding1[4];
+ std::atomic<uint8_t> Abort;
+ std::atomic<uint8_t> Pause;
+ uint8_t Padding2[2];
+ };
+
+ struct MemMap
+ {
+ void* Handle = nullptr;
+ void* Data = nullptr;
+ size_t Size = 0;
+ std::string Name;
+ };
+
+ class ZenState
+ {
+ public:
+ ZenState(const ZenState&) = delete;
+ ZenState& operator=(const ZenState&) = delete;
+
+ ZenState();
+ explicit ZenState(uint32_t Pid);
+ ~ZenState();
+
+ const ZenStateSharedData& StateData() const
+ {
+ ZEN_ASSERT(m_Data);
+ return *m_Data;
+ }
+ ZenStateSharedData& StateData()
+ {
+ ZEN_ASSERT(m_Data);
+ return *m_Data;
+ }
+
+ private:
+ static constexpr std::string_view MapBaseName = "UnrealEngineZenCmd_"sv;
+ static constexpr size_t MapSize = sizeof(ZenStateSharedData);
+
+ bool m_Created = false;
+ std::unique_ptr<SharedMemory> m_MemMap;
+ ZenStateSharedData* m_Data = nullptr;
+
+ std::thread m_StateMonitor;
+ Event m_ExitStateMonitorEvent;
+ };
+
+ ZenState::ZenState(uint32_t Pid)
+ {
+ const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid);
+
+ if (!IsProcessRunning(Pid))
+ {
+ throw std::runtime_error(fmt::format("The process {} is not running", Pid));
+ }
+ std::unique_ptr<SharedMemory> MemMap = OpenSharedMemory(ZenStateMapName, MapSize, false);
+ if (!MemMap)
+ {
+ throw std::runtime_error(fmt::format("The process {} is not a running zen process", Pid));
+ }
+ ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData();
+ if (uint64_t MemMagic = data->Magic; MemMagic != ZenStateSharedData::kMagicV1)
+ {
+ throw std::runtime_error(fmt::format("The mem map for process {} has an unsupported magic {:x}, expected {:x}",
+ Pid,
+ MemMagic,
+ ZenStateSharedData::kMagicV1));
+ }
+ if (uint32_t MemPid = data->Pid.load(); MemPid != Pid)
+ {
+ throw std::runtime_error(
+ fmt::format("The mem map for process {} has an missmatching pid of {}, expected {}", Pid, MemPid, Pid));
+ }
+ m_MemMap = std::move(MemMap);
+ m_Data = data;
+ }
+
+ ZenState::ZenState()
+ {
+ int Pid = GetCurrentProcessId();
+
+ const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid);
+
+ std::unique_ptr<SharedMemory> MemMap = CreateSharedMemory(ZenStateMapName, MapSize, false);
+ if (!MemMap)
+ {
+ throw std::runtime_error(fmt::format("The mem map for process {} could not be created", Pid));
+ }
+
+ ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData();
+ memset(data, 0, sizeof(ZenStateSharedData));
+ data->Magic = ZenStateSharedData::kMagicV1;
+ data->Pid.store(gsl::narrow<uint32_t>(Pid));
+ data->SessionId;
+ data->Abort.store(false);
+ data->Pause.store(false);
+ const Oid SessionId = GetSessionId();
+ memcpy(data->SessionId, &SessionId, sizeof SessionId);
+
+ m_MemMap = std::move(MemMap);
+ m_Data = data;
+
+ m_StateMonitor = std::thread([this]() {
+ while (!m_ExitStateMonitorEvent.Wait(500))
+ {
+ if (m_Data->Abort.load())
+ {
+ AbortFlag.store(true);
+ }
+ PauseFlag.store(m_Data->Pause.load());
+ }
+ });
+ }
+
+ ZenState::~ZenState()
+ {
+ try
+ {
+ if (m_StateMonitor.joinable())
+ {
+ m_ExitStateMonitorEvent.Set();
+ m_StateMonitor.join();
+ }
+ m_MemMap.reset();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("ZenState::~ZenState threw exception: {}", Ex.what());
+ }
+ }
static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
@@ -434,7 +618,7 @@ namespace {
std::atomic<uint64_t> DiscoveredItemCount = 0;
std::atomic<uint64_t> DeletedItemCount = 0;
std::atomic<uint64_t> DeletedByteCount = 0;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
@@ -548,8 +732,8 @@ namespace {
uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs();
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
LastUpdateTimeMs = Timer.GetElapsedTimeMs();
uint64_t Deleted = DeletedItemCount.load();
@@ -558,7 +742,8 @@ namespace {
Progress.UpdateState({.Task = "Cleaning folder ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -597,12 +782,17 @@ namespace {
Progress.UpdateState({.Task = "Cleaning folder ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
false);
}
}
Progress.Finish();
+ if (AbortFlag)
+ {
+ return false;
+ }
uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
if (ElapsedTimeMs >= 200)
@@ -1953,7 +2143,7 @@ namespace {
WorkerThreadPool& NetworkPool = GetNetworkPool();
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -2116,8 +2306,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount;
const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount;
@@ -2140,7 +2330,8 @@ namespace {
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
.RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))},
+ (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load())),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -2396,7 +2587,7 @@ namespace {
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
@@ -2566,8 +2757,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load());
@@ -2586,7 +2777,8 @@ namespace {
{.Task = "Generating blocks",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(NewBlockCount),
- .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -2625,7 +2817,7 @@ namespace {
FilteredRate FilteredCompressedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<size_t> UploadedBlockSize = 0;
std::atomic<size_t> UploadedBlockCount = 0;
@@ -3005,8 +3197,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
@@ -3034,13 +3226,15 @@ namespace {
ProgressBar.UpdateState({.Task = "Uploading blobs ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
- .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize)},
+ .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
ProgressBar.Finish();
+
UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
@@ -3492,7 +3686,7 @@ namespace {
Content,
*ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -3505,16 +3699,18 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = TotalRawSize,
- .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()},
+ .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return;
}
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
}
ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
@@ -4265,7 +4461,7 @@ namespace {
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size());
@@ -4413,8 +4609,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
VerifyFolderStats.FilesVerified.load(),
PathCount,
@@ -4423,12 +4619,18 @@ namespace {
ProgressBar.UpdateState({.Task = "Verifying files ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(PathCount),
- .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs();
ProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return;
+ }
+
for (const std::string& Error : Errors)
{
ZEN_CONSOLE("{}", Error);
@@ -5360,7 +5562,7 @@ namespace {
Stopwatch Timer;
auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> CompletedPathCount = 0;
uint32_t PathIndex = 0;
@@ -5391,7 +5593,7 @@ namespace {
});
PathIndex += PathRangeCount;
}
- Work.Wait(200, [&](bool, ptrdiff_t) {
+ Work.Wait(200, [&](bool, bool, ptrdiff_t) {
if (ProgressCallback)
{
ProgressCallback(PathCount, CompletedPathCount.load());
@@ -5669,7 +5871,7 @@ namespace {
ScavengedPaths.resize(ScavengePathCount);
ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> PathsFound(0);
std::atomic<uint64_t> ChunksFound(0);
@@ -5799,8 +6001,8 @@ namespace {
{
ZEN_TRACE_CPU("ScavengeScan_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavanging",
PathsScavenged.load(),
ScavengePathCount,
@@ -5809,11 +6011,17 @@ namespace {
ScavengeProgressBar.UpdateState({.Task = "Scavenging ",
.Details = Details,
.TotalCount = ScavengePathCount,
- .RemainingCount = ScavengePathCount - PathsScavenged.load()},
+ .RemainingCount = ScavengePathCount - PathsScavenged.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
+
ScavengeProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return;
+ }
for (uint32_t ScavengedContentIndex = 0;
ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty());
@@ -6129,7 +6337,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct LooseChunkHashWorkData
{
@@ -7582,8 +7790,8 @@ namespace {
{
ZEN_TRACE_CPU("WriteChunks_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
@@ -7609,7 +7817,8 @@ namespace {
.Details = Details,
.TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite,
.RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load())
- : (BytesToWrite - DiskStats.WriteByteCount.load())},
+ : (BytesToWrite - DiskStats.WriteByteCount.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
@@ -7617,13 +7826,12 @@ namespace {
FilteredWrittenBytesPerSecond.Stop();
FilteredDownloadedBytesPerSecond.Stop();
+ WriteProgressBar.Finish();
if (AbortFlag)
{
return;
}
- WriteProgressBar.Finish();
-
if (!PrimeCacheOnly)
{
uint32_t RawSequencesMissingWriteCount = 0;
@@ -7770,7 +7978,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t LocalPathIndex : FilesToCache)
{
@@ -7799,26 +8007,26 @@ namespace {
{
ZEN_TRACE_CPU("CacheLocal_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t WorkTotal = FilesToCache.size();
const uint64_t WorkComplete = CachedCount.load();
std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
CacheLocalProgressBar.UpdateState({.Task = "Caching local ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
+ CacheLocalProgressBar.Finish();
if (AbortFlag)
{
return;
}
- CacheLocalProgressBar.Finish();
-
ZEN_DEBUG(
"Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
"Delete: {}",
@@ -7857,7 +8065,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
OutLocalFolderState.Paths.resize(RemoteContent.Paths.size());
OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size());
@@ -8108,26 +8316,21 @@ namespace {
{
ZEN_TRACE_CPU("FinalizeTree_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
-
- if (AbortFlag)
- {
- return;
- }
-
RebuildProgressBar.Finish();
}
}
@@ -8689,10 +8892,15 @@ namespace {
ProgressBar.UpdateState({.Task = "Checking files ",
.Details = Details,
.TotalCount = PathCount,
- .RemainingCount = PathCount - CompletedPathCount},
+ .RemainingCount = PathCount - CompletedPathCount,
+ .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
false);
});
ProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return {};
+ }
}
bool ScanContent = true;
@@ -8730,7 +8938,7 @@ namespace {
UpdatedContent,
ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -8743,16 +8951,19 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = ByteCountToScan,
- .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()},
+ .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return {};
}
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}});
}
}
@@ -8798,7 +9009,7 @@ namespace {
OutLocalFolderContent,
ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -8811,18 +9022,19 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = ByteCountToScan,
- .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())},
+ .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return {};
}
-
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
}
return LocalContent;
}
@@ -9433,7 +9645,6 @@ BuildsCommand::BuildsCommand()
auto AddCloudOptions = [this, &AddAuthOptions](cxxopts::Options& Ops) {
AddAuthOptions(Ops);
-
Ops.add_option("cloud build", "", "override-host", "Cloud Builds URL", cxxopts::value(m_OverrideHost), "<override-host>");
Ops.add_option("cloud build",
"",
@@ -9441,6 +9652,7 @@ BuildsCommand::BuildsCommand()
"Cloud Builds host url (legacy - use --override-host)",
cxxopts::value(m_OverrideHost),
"<url>");
+ Ops.add_option("cloud build", "", "cloud-url", "Cloud Artifact URL", cxxopts::value(m_Url), "<cloud-url>");
Ops.add_option("cloud build", "", "host", "Cloud Builds host", cxxopts::value(m_Host), "<host>");
Ops.add_option("cloud build",
"",
@@ -9745,6 +9957,21 @@ BuildsCommand::BuildsCommand()
m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"});
m_FetchBlobOptions.positional_help("build-id blob-hash");
+ auto AddZenProcessOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("", "", "process-id", "Process id of running process", cxxopts::value(m_ZenProcessId), "<pid>");
+ };
+ AddZenProcessOptions(m_PauseOptions);
+ m_PauseOptions.parse_positional({"process-id"});
+ m_PauseOptions.positional_help("process-id");
+
+ AddZenProcessOptions(m_ResumeOptions);
+ m_ResumeOptions.parse_positional({"process-id"});
+ m_ResumeOptions.positional_help("process-id");
+
+ AddZenProcessOptions(m_AbortOptions);
+ m_AbortOptions.parse_positional({"process-id"});
+ m_AbortOptions.positional_help("process-id");
+
AddSystemOptions(m_ValidateBuildPartOptions);
AddCloudOptions(m_ValidateBuildPartOptions);
AddFileOptions(m_ValidateBuildPartOptions);
@@ -9829,22 +10056,75 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
};
ParseSystemOptions();
- auto ParseStorageOptions = [&]() {
+ auto ParseStorageOptions = [&](bool RequireNamespace, bool RequireBucket) {
+ m_Host = RemoveQuotes(m_Host);
+ m_OverrideHost = RemoveQuotes(m_OverrideHost);
+ m_Url = RemoveQuotes(m_Url);
+ m_Namespace = RemoveQuotes(m_Namespace);
+ m_Bucket = RemoveQuotes(m_Bucket);
+ if (!m_Url.empty())
+ {
+ if (!m_Host.empty())
+ {
+ throw zen::OptionParseException(fmt::format("host is not compatible with the url option\n{}", SubOption->help()));
+ }
+ if (!m_Bucket.empty())
+ {
+ throw zen::OptionParseException(fmt::format("bucket is not compatible with the url option\n{}", SubOption->help()));
+ }
+ if (!m_BuildId.empty())
+ {
+ throw zen::OptionParseException(fmt::format("buildid is not compatible with the url option\n{}", SubOption->help()));
+ }
+ const std::string ArtifactURLRegExString = R"((.*?:\/\/.*?)\/api\/v2\/builds\/(.*?)\/(.*?)\/(.*))";
+ const std::regex ArtifactURLRegEx(ArtifactURLRegExString, std::regex::ECMAScript);
+ std::match_results<std::string_view::const_iterator> MatchResults;
+ const std::string_view Url(RemoveQuotes(m_Url));
+ if (regex_match(begin(Url), end(Url), MatchResults, ArtifactURLRegEx) && MatchResults.size() == 5)
+ {
+ auto GetMatch = [&MatchResults](uint32_t Index) -> std::string_view {
+ ZEN_ASSERT(Index < MatchResults.size());
+
+ const auto& Match = MatchResults[Index];
+
+ return std::string_view(&*Match.first, Match.second - Match.first);
+ };
+
+ const std::string_view Host = GetMatch(1);
+ const std::string_view Namespace = GetMatch(2);
+ const std::string_view Bucket = GetMatch(3);
+ const std::string_view BuildId = GetMatch(4);
+
+ m_Host = Host;
+ m_Namespace = Namespace;
+ m_Bucket = Bucket;
+ m_BuildId = BuildId;
+ }
+ else
+ {
+ throw zen::OptionParseException(fmt::format("url does not match the Cloud Artifact URL format\n{}", SubOption->help()));
+ }
+ }
+
if (!m_OverrideHost.empty() || !m_Host.empty())
{
if (!m_StoragePath.empty())
{
- throw zen::OptionParseException(fmt::format("url is not compatible with the storage-path option\n{}", m_Options.help()));
+ throw zen::OptionParseException(
+ fmt::format("host/url/override-host is not compatible with the storage-path option\n{}", SubOption->help()));
}
- if (SubOption != &m_ListNamespacesOptions && (m_Namespace.empty() || m_Bucket.empty()))
+ if (RequireNamespace && m_Namespace.empty())
{
- throw zen::OptionParseException(
- fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help()));
+ throw zen::OptionParseException(fmt::format("namespace option is required for this storage option\n{}", SubOption->help()));
+ }
+ if (RequireBucket && m_Bucket.empty())
+ {
+ throw zen::OptionParseException(fmt::format("bucket option is required for this storage option\n{}", SubOption->help()));
}
}
else if (m_StoragePath.empty())
{
- throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", SubOption->help()));
}
MakeSafeAbsolutePathÍnPlace(m_StoragePath);
};
@@ -9887,6 +10167,19 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
};
auto ParseAuthOptions = [&]() {
+ m_OpenIdProviderUrl = RemoveQuotes(m_OpenIdProviderUrl);
+ m_OpenIdClientId = RemoveQuotes(m_OpenIdClientId);
+
+ m_AccessToken = RemoveQuotes(m_AccessToken);
+ m_EncryptionKey = RemoveQuotes(m_EncryptionKey);
+ m_EncryptionIV = RemoveQuotes(m_EncryptionIV);
+
+ m_OAuthUrl = RemoveQuotes(m_OAuthUrl);
+ m_OAuthClientId = RemoveQuotes(m_OAuthClientId);
+ m_OAuthClientSecret = RemoveQuotes(m_OAuthClientSecret);
+
+ m_OidcTokenAuthExecutablePath = RemoveQuotes(m_OidcTokenAuthExecutablePath);
+
if (!m_OpenIdProviderUrl.empty() && !m_OpenIdClientId.empty())
{
CreateAuthMgr();
@@ -9986,8 +10279,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats,
BuildStorageCache::Statistics& StorageCacheStats,
- const std::filesystem::path& TempPath) -> StorageInstance {
- ParseStorageOptions();
+ const std::filesystem::path& TempPath,
+ bool RequireNamespace,
+ bool RequireBucket) -> StorageInstance {
+ ParseStorageOptions(RequireNamespace, RequireBucket);
+
+ m_ZenCacheHost = RemoveQuotes(m_ZenCacheHost);
StorageInstance Result;
@@ -10198,7 +10495,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
- throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", SubOption->help()));
}
if (!m_ZenCacheHost.empty())
{
@@ -10241,7 +10538,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
auto ParsePath = [&]() {
if (m_Path.empty())
{
- throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("local-path is required\n{}", SubOption->help()));
}
MakeSafeAbsolutePathÍnPlace(m_Path);
};
@@ -10249,34 +10546,36 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
auto ParseDiffPath = [&]() {
if (m_DiffPath.empty())
{
- throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help()));
+ throw zen::OptionParseException(fmt::format("compare-path is required\n{}", SubOption->help()));
}
MakeSafeAbsolutePathÍnPlace(m_DiffPath);
};
auto ParseBlobHash = [&]() -> IoHash {
+ m_BlobHash = RemoveQuotes(m_BlobHash);
if (m_BlobHash.empty())
{
- throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", SubOption->help()));
}
IoHash BlobHash;
if (!IoHash::TryParse(m_BlobHash, BlobHash))
{
- throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", SubOption->help()));
}
return BlobHash;
};
auto ParseBuildId = [&]() -> Oid {
+ m_BuildId = RemoveQuotes(m_BuildId);
if (m_BuildId.length() != Oid::StringLength)
{
- throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Invalid build id\n{}", SubOption->help()));
}
else if (Oid BuildId = Oid::FromHexString(m_BuildId); BuildId == Oid::Zero)
{
- throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Invalid build id\n{}", SubOption->help()));
}
else
{
@@ -10285,13 +10584,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
};
auto ParseBuildPartId = [&]() -> Oid {
+ m_BuildPartId = RemoveQuotes(m_BuildPartId);
if (m_BuildPartId.length() != Oid::StringLength)
{
- throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", SubOption->help()));
}
else if (Oid BuildPartId = Oid::FromHexString(m_BuildPartId); BuildPartId == Oid::Zero)
{
- throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", SubOption->help()));
}
else
{
@@ -10303,25 +10603,38 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::vector<Oid> BuildPartIds;
for (const std::string& BuildPartId : m_BuildPartIds)
{
- BuildPartIds.push_back(Oid::TryFromHexString(BuildPartId));
+ BuildPartIds.push_back(Oid::TryFromHexString(RemoveQuotes(BuildPartId)));
if (BuildPartIds.back() == Oid::Zero)
{
- throw zen::OptionParseException(fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, m_DownloadOptions.help()));
+ throw zen::OptionParseException(fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, SubOption->help()));
}
}
return BuildPartIds;
};
+ auto ParseBuildPartNames = [&]() -> std::vector<std::string> {
+ std::vector<std::string> BuildPartNames;
+ for (const std::string& BuildPartName : m_BuildPartNames)
+ {
+ BuildPartNames.push_back(std::string(RemoveQuotes(BuildPartName)));
+ if (BuildPartNames.back().empty())
+ {
+ throw zen::OptionParseException(fmt::format("build-part-names '{}' is invalid\n{}", BuildPartName, SubOption->help()));
+ }
+ }
+ return BuildPartNames;
+ };
+
auto ParseBuildMetadata = [&]() -> CbObject {
if (m_CreateBuild)
{
if (m_BuildMetadataPath.empty() && m_BuildMetadata.empty())
{
- throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", SubOption->help()));
}
if (!m_BuildMetadataPath.empty() && !m_BuildMetadata.empty())
{
- throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", SubOption->help()));
}
if (!m_BuildMetadataPath.empty())
@@ -10338,6 +10651,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
return MetaData;
}
+ m_BuildMetadata = RemoveQuotes(m_BuildMetadata);
if (!m_BuildMetadata.empty())
{
CbObjectWriter MetaDataWriter(1024);
@@ -10358,12 +10672,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (!m_BuildMetadataPath.empty())
{
throw zen::OptionParseException(
- fmt::format("metadata-path option is only valid if creating a build\n{}", m_UploadOptions.help()));
+ fmt::format("metadata-path option is only valid if creating a build\n{}", SubOption->help()));
}
if (!m_BuildMetadata.empty())
{
- throw zen::OptionParseException(
- fmt::format("metadata option is only valid if creating a build\n{}", m_UploadOptions.help()));
+ throw zen::OptionParseException(fmt::format("metadata option is only valid if creating a build\n{}", SubOption->help()));
}
}
return {};
@@ -10378,7 +10691,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
if (!m_ListResultPath.empty())
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})",
+ GetRunningExecutablePath(),
+ ZEN_CFG_VERSION_BUILD_STRING_FULL,
+ GetCurrentProcessId());
}
BuildStorage::Statistics StorageStats;
@@ -10396,7 +10712,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath));
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(ZenFolderPath),
+ /*RequriesNamespace*/ false,
+ /*RequireBucket*/ false);
CbObject Response = Storage.BuildStorage->ListNamespaces(m_ListNamespacesRecursive);
ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None);
@@ -10432,7 +10752,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (!m_ListResultPath.empty())
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})",
+ GetRunningExecutablePath(),
+ ZEN_CFG_VERSION_BUILD_STRING_FULL,
+ GetCurrentProcessId());
}
CbObject QueryObject;
if (m_ListQueryPath.empty())
@@ -10480,7 +10803,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath));
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ false);
CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject);
ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None);
@@ -10510,25 +10837,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_UploadOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
-
- ParsePath();
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
- if (m_BuildPartName.empty())
- {
- m_BuildPartName = m_Path.filename().string();
- }
+ ZenState InstanceState;
- const Oid BuildId = m_BuildId.empty() ? Oid::NewOid() : ParseBuildId();
- if (m_BuildId.empty())
- {
- m_BuildId = BuildId.ToString();
- }
- const Oid BuildPartId = m_BuildPartId.empty() ? Oid::NewOid() : ParseBuildPartId();
- if (m_BuildPartId.empty())
- {
- m_BuildPartId = BuildPartId.ToString();
- }
+ ParsePath();
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -10548,11 +10861,33 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath));
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ true);
+
+ if (m_BuildPartName.empty())
+ {
+ m_BuildPartName = m_Path.filename().string();
+ }
+
+ const Oid BuildId = m_BuildId.empty() ? Oid::NewOid() : ParseBuildId();
+ if (m_BuildId.empty())
+ {
+ m_BuildId = BuildId.ToString();
+ }
+ const Oid BuildPartId = m_BuildPartId.empty() ? Oid::NewOid() : ParseBuildPartId();
+ if (m_BuildPartId.empty())
+ {
+ m_BuildPartId = BuildPartId.ToString();
+ }
CbObject MetaData = ParseBuildMetadata();
- const std::filesystem::path TempDir = UploadTempDirectory(m_Path);
+ const std::filesystem::path TempDir = ZenTempFolderPath(m_ZenFolderPath);
+
+ m_ManifestPath = RemoveQuotes(m_ManifestPath);
UploadFolder(Storage,
BuildId,
@@ -10594,16 +10929,33 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_DownloadOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
+ ZenState InstanceState;
ParsePath();
+ if (m_ZenFolderPath.empty())
+ {
+ m_ZenFolderPath = m_Path / ZenFolderName;
+ }
+ MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
+
+ BuildStorage::Statistics StorageStats;
+ BuildStorageCache::Statistics StorageCacheStats;
+
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ true);
+
const Oid BuildId = ParseBuildId();
if (m_PostDownloadVerify && m_PrimeCacheOnly)
{
throw zen::OptionParseException(
- fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", m_DownloadOptions.help()));
+ fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", SubOption->help()));
}
if (m_Clean && m_PrimeCacheOnly)
@@ -10616,23 +10968,13 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_WARN("ignoring 'allow-partial-block-requests' option when 'cache-prime-only' is enabled");
}
- std::vector<Oid> BuildPartIds = ParseBuildPartIds();
-
- if (m_ZenFolderPath.empty())
- {
- m_ZenFolderPath = m_Path / ZenFolderName;
- }
- MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
-
- BuildStorage::Statistics StorageStats;
- BuildStorageCache::Statistics StorageCacheStats;
-
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath));
+ std::vector<Oid> BuildPartIds = ParseBuildPartIds();
+ std::vector<std::string> BuildPartNames = ParseBuildPartNames();
DownloadFolder(Storage,
BuildId,
BuildPartIds,
- m_BuildPartNames,
+ BuildPartNames,
m_Path,
m_ZenFolderPath,
m_SystemRootDir,
@@ -10656,10 +10998,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_FetchBlobOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
- IoHash BlobHash = ParseBlobHash();
-
- const Oid BuildId = Oid::FromHexString(m_BuildId);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -10679,7 +11018,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath));
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ true);
+
+ IoHash BlobHash = ParseBlobHash();
+
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
uint64_t CompressedSize;
uint64_t DecompressedSize;
@@ -10697,14 +11044,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_ValidateBuildPartOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
- Oid BuildId = ParseBuildId();
-
- if (!m_BuildPartName.empty() && !m_BuildPartId.empty())
- {
- throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help()));
- }
+ ZenState InstanceState;
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -10724,7 +11066,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
});
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath));
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ true);
+
+ Oid BuildId = ParseBuildId();
+
+ if (!m_BuildPartName.empty() && !m_BuildPartId.empty())
+ {
+ throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", SubOption->help()));
+ }
const Oid BuildPartId = m_BuildPartName.empty() ? Oid::Zero : ParseBuildPartId();
@@ -10753,15 +11106,19 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath));
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ true);
Stopwatch Timer;
for (const std::string& BuildIdString : m_BuildIds)
{
- Oid BuildId = Oid::FromHexString(BuildIdString);
+ Oid BuildId = Oid::FromHexString(RemoveQuotes(BuildIdString));
if (BuildId == Oid::Zero)
{
- throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help()));
+ throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, SubOption->help()));
}
DownloadFolder(Storage,
BuildId,
@@ -10787,6 +11144,50 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
+ auto ParseZenProcessId = [&]() {
+ if (m_ZenProcessId == -1)
+ {
+ const std::filesystem::path RunningExecutablePath = GetRunningExecutablePath();
+ ProcessHandle RunningProcess;
+ std::error_code Ec = FindProcess(RunningExecutablePath, RunningProcess, /*IncludeSelf*/ false);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed finding process running '{}', reason: '{}'", RunningExecutablePath, Ec.message()));
+ }
+ if (!RunningProcess.IsValid())
+ {
+ throw zen::OptionParseException(
+ fmt::format("Unable to find a running instance of the zen executable '{}'", RunningExecutablePath));
+ }
+ m_ZenProcessId = RunningProcess.Pid();
+ }
+ };
+
+ if (SubOption == &m_PauseOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Pause.store(true);
+ return 0;
+ }
+
+ if (SubOption == &m_ResumeOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Pause.store(false);
+ return 0;
+ }
+
+ if (SubOption == &m_AbortOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Abort.store(true);
+ return 0;
+ }
+
if (SubOption == &m_TestOptions)
{
m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred();
@@ -10796,14 +11197,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ParsePath();
- m_BuildId = Oid::NewOid().ToString();
- m_BuildPartName = m_Path.filename().string();
- m_BuildPartId = Oid::NewOid().ToString();
- m_CreateBuild = true;
-
- const Oid BuildId = Oid::FromHexString(m_BuildId);
- const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
-
if (m_OverrideHost.empty() && m_StoragePath.empty())
{
m_StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred();
@@ -10838,7 +11231,19 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
- StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath));
+ StorageInstance Storage = CreateBuildStorage(StorageStats,
+ StorageCacheStats,
+ ZenTempFolderPath(m_ZenFolderPath),
+ /*RequriesNamespace*/ true,
+ /*RequireBucket*/ true);
+
+ m_BuildId = Oid::NewOid().ToString();
+ m_BuildPartName = m_Path.filename().string();
+ m_BuildPartId = Oid::NewOid().ToString();
+ m_CreateBuild = true;
+
+ const Oid BuildId = Oid::FromHexString(m_BuildId);
+ const Oid BuildPartId = Oid::FromHexString(m_BuildPartId);
auto MakeMetaData = [](const Oid& BuildId) -> CbObject {
CbObjectWriter BuildMetaDataWriter;
@@ -10955,7 +11360,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return true;
};
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
uint32_t Randomizer = 0;
auto FileSizeIt = DownloadContent.FileSizes.begin();
@@ -11013,8 +11418,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
FileSizeIt++;
}
- Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.Wait(5000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork);
});
ZEN_ASSERT(!AbortFlag.load());
@@ -11161,6 +11566,24 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
}
+ catch (const std::system_error& SysErr)
+ {
+ if (IsOOD(SysErr))
+ {
+ ZEN_CONSOLE("Operation failed due to out of disk space: {}", SysErr.what());
+ return 3;
+ }
+ else if (IsOOM(SysErr))
+ {
+ ZEN_CONSOLE("Operation failed due to out of memory: {}", SysErr.what());
+ return 3;
+ }
+ else
+ {
+ ZEN_ERROR("{}", SysErr.what());
+ return 3;
+ }
+ }
catch (const std::exception& Ex)
{
ZEN_ERROR("{}", Ex.what());
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index 41ed65105..b3466c0d3 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -38,6 +38,7 @@ private:
// cloud builds
std::string m_OverrideHost;
std::string m_Host;
+ std::string m_Url;
bool m_AssumeHttp2 = false;
bool m_AllowRedirect = false;
std::string m_Namespace;
@@ -88,7 +89,6 @@ private:
std::string m_Verb; // list, upload, download
cxxopts::Options m_ListNamespacesOptions{"list-namespaces", "List available build namespaces"};
- std::string m_ListNamespacesResultPath;
bool m_ListNamespacesRecursive = false;
cxxopts::Options m_ListOptions{"list", "List available builds"};
@@ -114,6 +114,12 @@ private:
cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"};
std::string m_BlobHash;
+ cxxopts::Options m_PauseOptions{"pause", "Pause an ongoing zen builds process"};
+ cxxopts::Options m_ResumeOptions{"resume", "Resume a paused zen builds process"};
+ cxxopts::Options m_AbortOptions{"abort", "Abort an ongoing zen builds process"};
+
+ int m_ZenProcessId = -1;
+
cxxopts::Options m_ValidateBuildPartOptions{"validate-part", "Fetch a build part and validate all referenced attachments"};
cxxopts::Options m_TestOptions{"test", "Test upload and download with verify"};
@@ -121,15 +127,18 @@ private:
cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"};
std::vector<std::string> m_BuildIds;
- cxxopts::Options* m_SubCommands[9] = {&m_ListNamespacesOptions,
- &m_ListOptions,
- &m_UploadOptions,
- &m_DownloadOptions,
- &m_DiffOptions,
- &m_FetchBlobOptions,
- &m_ValidateBuildPartOptions,
- &m_TestOptions,
- &m_MultiTestDownloadOptions};
+ cxxopts::Options* m_SubCommands[12] = {&m_ListNamespacesOptions,
+ &m_ListOptions,
+ &m_UploadOptions,
+ &m_DownloadOptions,
+ &m_PauseOptions,
+ &m_ResumeOptions,
+ &m_AbortOptions,
+ &m_DiffOptions,
+ &m_FetchBlobOptions,
+ &m_ValidateBuildPartOptions,
+ &m_TestOptions,
+ &m_MultiTestDownloadOptions};
};
} // namespace zen
diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp
index f3bf2f66d..d0763701e 100644
--- a/src/zen/cmds/up_cmd.cpp
+++ b/src/zen/cmds/up_cmd.cpp
@@ -311,7 +311,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
// Try to find the running executable by path name
std::filesystem::path ServerExePath = m_ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
ProcessHandle RunningProcess;
- if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec)
+ if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec, /*IncludeSelf*/ false)
{
ZEN_WARN("attempting hard terminate of zen process with pid ({})", RunningProcess.Pid());
try
diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp
index 9dfdca0a1..fcc18df2b 100644
--- a/src/zen/cmds/wipe_cmd.cpp
+++ b/src/zen/cmds/wipe_cmd.cpp
@@ -33,6 +33,7 @@ namespace zen {
namespace {
static std::atomic<bool> AbortFlag = false;
+ static std::atomic<bool> PauseFlag = false;
static bool IsVerbose = false;
static bool Quiet = false;
static ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty;
@@ -72,11 +73,13 @@ namespace {
{
if (SigNum == SIGINT)
{
+ PauseFlag = false;
AbortFlag = true;
}
#if ZEN_PLATFORM_WINDOWS
if (SigNum == SIGBREAK)
{
+ PauseFlag = false;
AbortFlag = true;
}
#endif // ZEN_PLATFORM_WINDOWS
@@ -248,7 +251,7 @@ namespace {
return Added;
};
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
@@ -423,12 +426,12 @@ namespace {
GetIOWorkerPool(),
Work.PendingWork());
- Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, ptrdiff_t PendingWork) {
+ Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
if (Quiet)
{
return;
}
- ZEN_UNUSED(IsAborted, PendingWork);
LastUpdateTimeMs = Timer.GetElapsedTimeMs();
uint64_t Deleted = DeletedItemCount.load();
@@ -437,7 +440,8 @@ namespace {
Progress.UpdateState({.Task = "Removing files ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 56c5bcc09..614112235 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -376,7 +376,8 @@ ProgressBar::SetLogOperationProgress(Mode InMode, uint32_t StepIndex, uint32_t S
ProgressBar::ProgressBar(Mode InMode, std::string_view InSubTask)
: m_Mode((!IsStdoutTty() && InMode == Mode::Pretty) ? Mode::Plain : InMode)
-, m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000)
+, m_LastUpdateMS((uint64_t)-1)
+, m_PausedMS(0)
, m_SubTask(InSubTask)
{
if (!m_SubTask.empty() && InMode == Mode::Log)
@@ -413,20 +414,46 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
}
uint64_t ElapsedTimeMS = m_SW.GetElapsedTimeMs();
- if (!DoLinebreak && (NewState.Task == m_State.Task) && ((m_LastUpdateMS + 200) > ElapsedTimeMS))
+ if (m_LastUpdateMS != (uint64_t)-1)
{
- return;
+ if (!DoLinebreak && (NewState.Status == m_State.Status) && (NewState.Task == m_State.Task) &&
+ ((m_LastUpdateMS + 200) > ElapsedTimeMS))
+ {
+ return;
+ }
+ if (m_State.Status == State::EStatus::Paused)
+ {
+ uint64_t ElapsedSinceLast = ElapsedTimeMS - m_LastUpdateMS;
+ m_PausedMS += ElapsedSinceLast;
+ }
}
m_LastUpdateMS = ElapsedTimeMS;
+ std::string Task = NewState.Task;
+ switch (NewState.Status)
+ {
+ case State::EStatus::Aborted:
+ Task = "Aborting";
+ break;
+ case State::EStatus::Paused:
+ Task = "Paused";
+ break;
+ default:
+ break;
+ }
+ if (NewState.Task.length() > Task.length())
+ {
+ Task += std::string(NewState.Task.length() - Task.length(), ' ');
+ }
+
const size_t PercentDone =
NewState.TotalCount > 0u ? gsl::narrow<uint8_t>((100 * (NewState.TotalCount - NewState.RemainingCount)) / NewState.TotalCount) : 0u;
if (m_Mode == Mode::Plain)
{
const std::string Details = (!NewState.Details.empty()) ? fmt::format(": {}", NewState.Details) : "";
- const std::string Output = fmt::format("{} {}% ({}){}\n", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details);
+ const std::string Output = fmt::format("{} {}% ({}){}\n", Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details);
OutputToConsoleRaw(Output);
}
else if (m_Mode == Mode::Pretty)
@@ -435,12 +462,12 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
size_t ProgressBarCount = (ProgressBarSize * PercentDone) / 100;
uint64_t Completed = NewState.TotalCount - NewState.RemainingCount;
- uint64_t ETAMS = (PercentDone > 5) ? (ElapsedTimeMS * NewState.RemainingCount) / Completed : 0;
+ uint64_t ETAElapsedMS = ElapsedTimeMS -= m_PausedMS;
+ uint64_t ETAMS =
+ (NewState.Status == State::EStatus::Running) && (PercentDone > 5) ? (ETAElapsedMS * NewState.RemainingCount) / Completed : 0;
uint32_t ConsoleColumns = GetConsoleColumns();
- std::string_view TaskString = NewState.Task;
-
const std::string PercentString = fmt::format("{:#3}%", PercentDone);
const std::string ProgressBarString =
@@ -454,7 +481,7 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
ExtendableStringBuilder<256> OutputBuilder;
- OutputBuilder << "\r" << TaskString << PercentString;
+ OutputBuilder << "\r" << Task << PercentString;
if (OutputBuilder.Size() + 1 < ConsoleColumns)
{
size_t RemainingSpace = ConsoleColumns - (OutputBuilder.Size() + 1);
@@ -549,7 +576,7 @@ ProgressBar::ForceLinebreak()
void
ProgressBar::Finish()
{
- if (m_LastOutputLength > 0)
+ if (m_LastOutputLength > 0 || m_State.RemainingCount > 0)
{
State NewState = m_State;
NewState.RemainingCount = 0;
@@ -846,39 +873,22 @@ main(int argc, char** argv)
.add_option("ue-trace", "", "tracefile", "Path to write a trace to", cxxopts::value<std::string>(TraceFile)->default_value(""), "");
#endif // ZEN_WITH_TRACE
- Options.parse_positional({"command"});
#if ZEN_USE_SENTRY
- bool NoSentry = false;
- bool SentryAllowPII = false;
- Options.add_options()("no-sentry", "Disable Sentry crash handler", cxxopts::value<bool>(NoSentry)->default_value("false"));
- Options.add_options()("sentry-allow-personal-info",
- "Allow personally identifiable information in sentry crash reports",
- cxxopts::value<bool>(SentryAllowPII)->default_value("false"));
+ bool NoSentry = false;
+ bool SentryAllowPII = false;
+ std::string SentryDsn;
+ Options
+ .add_option("sentry", "", "no-sentry", "Disable Sentry crash handler", cxxopts::value<bool>(NoSentry)->default_value("false"), "");
+ Options.add_option("sentry",
+ "",
+ "sentry-allow-personal-info",
+ "Allow personally identifiable information in sentry crash reports",
+ cxxopts::value<bool>(SentryAllowPII)->default_value("false"),
+ "");
+ Options.add_option("sentry", "", "sentry-dsn", "Sentry DSN to send events to", cxxopts::value<std::string>(SentryDsn), "");
#endif
-#if ZEN_USE_SENTRY
- SentryIntegration Sentry;
-
- if (NoSentry == false)
- {
- std::string SentryDatabasePath = (GetRunningExecutablePath().parent_path() / ".sentry-native").string();
-
- ExtendableStringBuilder<512> SB;
- for (int i = 0; i < argc; ++i)
- {
- if (i)
- {
- SB.Append(' ');
- }
-
- SB.Append(argv[i]);
- }
-
- Sentry.Initialize(SentryDatabasePath, {}, SentryAllowPII, SB.ToString());
-
- SentryIntegration::ClearCaches();
- }
-#endif
+ Options.parse_positional({"command"});
const bool IsNullInvoke = (argc == 1); // If no arguments are passed we want to print usage information
@@ -919,6 +929,30 @@ main(int argc, char** argv)
exit(0);
}
+#if ZEN_USE_SENTRY
+ SentryIntegration Sentry;
+
+ if (NoSentry == false)
+ {
+ std::string SentryDatabasePath = (std::filesystem::temp_directory_path() / ".zen-sentry-native").string();
+
+ ExtendableStringBuilder<512> SB;
+ for (int i = 0; i < argc; ++i)
+ {
+ if (i)
+ {
+ SB.Append(' ');
+ }
+
+ SB.Append(argv[i]);
+ }
+
+ Sentry.Initialize(SentryDatabasePath, {}, SentryDsn, SentryAllowPII, SB.ToString());
+
+ SentryIntegration::ClearCaches();
+ }
+#endif
+
zen::LoggingOptions LogOptions;
LogOptions.IsDebug = GlobalOptions.IsDebug;
LogOptions.IsVerbose = GlobalOptions.IsVerbose;
diff --git a/src/zen/zen.h b/src/zen/zen.h
index 9ca228e37..995bd13b6 100644
--- a/src/zen/zen.h
+++ b/src/zen/zen.h
@@ -79,6 +79,26 @@ public:
std::string Details;
uint64_t TotalCount = 0;
uint64_t RemainingCount = 0;
+ enum class EStatus
+ {
+ Running,
+ Aborted,
+ Paused
+ };
+ EStatus Status = EStatus::Running;
+
+ static EStatus CalculateStatus(bool IsAborted, bool IsPaused)
+ {
+ if (IsAborted)
+ {
+ return EStatus::Aborted;
+ }
+ if (IsPaused)
+ {
+ return EStatus::Paused;
+ }
+ return EStatus::Running;
+ }
};
enum class Mode
@@ -104,6 +124,7 @@ private:
const Mode m_Mode;
Stopwatch m_SW;
uint64_t m_LastUpdateMS;
+ uint64_t m_PausedMS;
State m_State;
const std::string m_SubTask;
size_t m_LastOutputLength = 0;
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index c4264bc29..46337ffc8 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <dirent.h>
# include <fcntl.h>
# include <sys/resource.h>
+# include <sys/mman.h>
# include <sys/stat.h>
# include <pwd.h>
# include <unistd.h>
@@ -43,6 +44,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <fcntl.h>
# include <libproc.h>
# include <sys/resource.h>
+# include <sys/mman.h>
# include <sys/stat.h>
# include <sys/syslimits.h>
# include <pwd.h>
@@ -2824,6 +2826,218 @@ SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly)
return Result;
}
+class SharedMemoryImpl : public SharedMemory
+{
+public:
+ struct Data
+ {
+ void* Handle = nullptr;
+ void* DataPtr = nullptr;
+ size_t Size = 0;
+ std::string Name;
+ };
+
+ static Data Open(std::string_view Name, size_t Size, bool SystemGlobal)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name));
+
+ HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, InstanceMapName.c_str());
+ if (hMap == NULL)
+ {
+ return {};
+ }
+ void* pBuf = MapViewOfFile(hMap, // handle to map object
+ FILE_MAP_ALL_ACCESS, // read/write permission
+ 0, // offset high
+ 0, // offset low
+ DWORD(Size)); // size
+
+ if (pBuf == NULL)
+ {
+ CloseHandle(hMap);
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ ZEN_UNUSED(SystemGlobal);
+ std::string InstanceMapName = fmt::format("/{}", Name);
+
+ int Fd = shm_open(InstanceMapName.c_str(), O_RDWR, 0666);
+ if (Fd < 0)
+ {
+ return {};
+ }
+ void* hMap = (void*)intptr_t(Fd);
+
+ struct stat Stat;
+ fstat(Fd, &Stat);
+
+ if (size_t(Stat.st_size) < Size)
+ {
+ close(Fd);
+ return {};
+ }
+
+ void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0);
+ if (pBuf == MAP_FAILED)
+ {
+ close(Fd);
+ return {};
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
+ static Data Create(std::string_view Name, size_t Size, bool SystemGlobal)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name));
+
+ SECURITY_ATTRIBUTES m_Attributes{};
+ SECURITY_DESCRIPTOR m_Sd{};
+
+ m_Attributes.nLength = sizeof m_Attributes;
+ m_Attributes.bInheritHandle = false; // Disable inheritance
+
+ const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION);
+
+ if (Success)
+ {
+ if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE))
+ {
+ ThrowLastError("SetSecurityDescriptorDacl failed");
+ }
+
+ m_Attributes.lpSecurityDescriptor = &m_Sd;
+ }
+
+ HANDLE hMap = CreateFileMapping(INVALID_HANDLE_VALUE, // use paging file
+ &m_Attributes, // allow anyone to access
+ PAGE_READWRITE, // read/write access
+ 0, // maximum object size (high-order DWORD)
+ DWORD(Size), // maximum object size (low-order DWORD)
+ InstanceMapName.c_str());
+ if (hMap == NULL)
+ {
+ return {};
+ }
+ void* pBuf = MapViewOfFile(hMap, // handle to map object
+ FILE_MAP_ALL_ACCESS, // read/write permission
+ 0, // offset high
+ 0, // offset low
+ DWORD(Size)); // size
+
+ if (pBuf == NULL)
+ {
+ CloseHandle(hMap);
+ return {};
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ ZEN_UNUSED(SystemGlobal);
+ std::string InstanceMapName = fmt::format("/{}", Name);
+
+ int Fd = shm_open(InstanceMapName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666);
+ if (Fd < 0)
+ {
+ return {};
+ }
+ fchmod(Fd, 0666);
+ void* hMap = (void*)intptr_t(Fd);
+
+ int Result = ftruncate(Fd, Size);
+ ZEN_UNUSED(Result);
+
+ void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0);
+ if (pBuf == MAP_FAILED)
+ {
+ close(Fd);
+ return {};
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
+ static void Close(Data&& MemMap, bool Delete)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ ZEN_UNUSED(Delete);
+ if (MemMap.DataPtr != nullptr)
+ {
+ UnmapViewOfFile(MemMap.DataPtr);
+ MemMap.DataPtr = nullptr;
+ }
+ if (MemMap.Handle != nullptr)
+ {
+ CloseHandle(MemMap.Handle);
+ MemMap.Handle = nullptr;
+ }
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ if (MemMap.DataPtr != nullptr)
+ {
+ munmap(MemMap.DataPtr, MemMap.Size);
+ MemMap.DataPtr = nullptr;
+ }
+
+ if (MemMap.Handle != nullptr)
+ {
+ int Fd = int(intptr_t(MemMap.Handle));
+ close(Fd);
+ MemMap.Handle = nullptr;
+ }
+ if (Delete)
+ {
+ std::string InstanceMapName = fmt::format("/{}", MemMap.Name);
+ shm_unlink(InstanceMapName.c_str());
+ }
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
+ SharedMemoryImpl(Data&& MemMap, bool IsOwned) : m_MemMap(std::move(MemMap)), m_IsOwned(IsOwned) {}
+ virtual ~SharedMemoryImpl()
+ {
+ try
+ {
+ Close(std::move(m_MemMap), /*Delete*/ m_IsOwned);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("SharedMemoryImpl::~SharedMemoryImpl threw exception: {}", Ex.what());
+ }
+ }
+
+ virtual void* GetData() override { return m_MemMap.DataPtr; }
+
+private:
+ Data m_MemMap;
+ const bool m_IsOwned = false;
+};
+
+std::unique_ptr<SharedMemory>
+OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal)
+{
+ SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Open(Name, Size, SystemGlobal);
+ if (MemMap.DataPtr)
+ {
+ return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ false);
+ }
+ return {};
+}
+
+std::unique_ptr<SharedMemory>
+CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal)
+{
+ SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Create(Name, Size, SystemGlobal);
+ if (MemMap.DataPtr)
+ {
+ return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ true);
+ }
+ return {};
+}
+
//////////////////////////////////////////////////////////////////////////
//
// Testing related code follows...
@@ -3108,6 +3322,22 @@ TEST_CASE("RotateDirectories")
}
}
+TEST_CASE("SharedMemory")
+{
+ CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false));
+ CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, true));
+
+ {
+ auto Mem0 = CreateSharedMemory("SharedMemoryTest0", 482, false);
+ CHECK(Mem0);
+ strcpy((char*)Mem0->GetData(), "this is the string we are looking for");
+ auto Mem1 = OpenSharedMemory("SharedMemoryTest0", 482, false);
+ CHECK_EQ(std::string((char*)Mem0->GetData()), std::string((char*)Mem1->GetData()));
+ }
+
+ CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false));
+}
+
#endif
} // namespace zen
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index dfd0eedc9..36d4d1b68 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -385,6 +385,16 @@ uint32_t MakeFileModeReadOnly(uint32_t FileMode, bool ReadOnly);
bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly, std::error_code& Ec);
bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly);
+class SharedMemory
+{
+public:
+ virtual ~SharedMemory() {}
+ virtual void* GetData() = 0;
+};
+
+std::unique_ptr<SharedMemory> OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal);
+std::unique_ptr<SharedMemory> CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal);
+
//////////////////////////////////////////////////////////////////////////
void filesystem_forcelink(); // internal
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h
index d1394cd9a..d3e1de703 100644
--- a/src/zencore/include/zencore/process.h
+++ b/src/zencore/include/zencore/process.h
@@ -98,7 +98,7 @@ ZENCORE_API int GetCurrentProcessId();
int GetProcessId(CreateProcResult ProcId);
std::filesystem::path GetProcessExecutablePath(int Pid, std::error_code& OutEc);
-std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle);
+std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf = true);
void process_forcelink(); // internal
diff --git a/src/zencore/include/zencore/sentryintegration.h b/src/zencore/include/zencore/sentryintegration.h
index 40e22af4e..d14c1c275 100644
--- a/src/zencore/include/zencore/sentryintegration.h
+++ b/src/zencore/include/zencore/sentryintegration.h
@@ -31,8 +31,12 @@ public:
SentryIntegration();
~SentryIntegration();
- void Initialize(std::string SentryDatabasePath, std::string SentryAttachmentsPath, bool AllowPII, const std::string& CommandLine);
- void LogStartupInformation();
+ void Initialize(std::string SentryDatabasePath,
+ std::string SentryAttachmentsPath,
+ std::string SentryDsn,
+ bool AllowPII,
+ const std::string& CommandLine);
+ void LogStartupInformation();
static void ClearCaches();
private:
diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp
index 48efc3f85..fcbe657cb 100644
--- a/src/zencore/process.cpp
+++ b/src/zencore/process.cpp
@@ -929,7 +929,7 @@ GetProcessExecutablePath(int Pid, std::error_code& OutEc)
}
std::error_code
-FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle)
+FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf)
{
#if ZEN_PLATFORM_WINDOWS
HANDLE ProcessSnapshotHandle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
@@ -939,13 +939,15 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
}
auto _ = MakeGuard([&]() { CloseHandle(ProcessSnapshotHandle); });
+ const DWORD ThisProcessId = ::GetCurrentProcessId();
+
PROCESSENTRY32 Entry;
Entry.dwSize = sizeof(PROCESSENTRY32);
if (Process32First(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry))
{
do
{
- if (ExecutableImage.filename() == Entry.szExeFile)
+ if ((IncludeSelf || (Entry.th32ProcessID != ThisProcessId)) && (ExecutableImage.filename() == Entry.szExeFile))
{
std::error_code Ec;
std::filesystem::path EntryPath = GetProcessExecutablePath(Entry.th32ProcessID, Ec);
@@ -970,6 +972,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
}
}
} while (::Process32Next(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry));
+ return {};
}
return MakeErrorCodeFromLastError();
#endif // ZEN_PLATFORM_WINDOWS
@@ -980,6 +983,8 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
struct kinfo_proc* Processes = nullptr;
uint32_t ProcCount = 0;
+ const pid_t ThisProcessId = getpid();
+
if (sysctl(Mib, 4, NULL, &BufferSize, NULL, 0) != -1 && BufferSize > 0)
{
struct kinfo_proc* Processes = (struct kinfo_proc*)malloc(BufferSize);
@@ -990,36 +995,46 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
char Buffer[PROC_PIDPATHINFO_MAXSIZE];
for (uint32_t ProcIndex = 0; ProcIndex < ProcCount; ProcIndex++)
{
- pid_t Pid = Processes[ProcIndex].kp_proc.p_pid;
- std::error_code Ec;
- std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec);
- if (!Ec)
+ pid_t Pid = Processes[ProcIndex].kp_proc.p_pid;
+ if (IncludeSelf || (Pid != ThisProcessId))
{
- if (EntryPath == ExecutableImage)
+ std::error_code Ec;
+ std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec);
+ if (!Ec)
{
- if (Processes[ProcIndex].kp_proc.p_stat != SZOMB)
+ if (EntryPath == ExecutableImage)
{
- OutHandle.Initialize(Pid, Ec);
- return Ec;
+ if (Processes[ProcIndex].kp_proc.p_stat != SZOMB)
+ {
+ OutHandle.Initialize(Pid, Ec);
+ return Ec;
+ }
}
}
+ Ec.clear();
}
}
+ return {};
}
}
return MakeErrorCodeFromLastError();
#endif // ZEN_PLATFORM_MAC
#if ZEN_PLATFORM_LINUX
+ const pid_t ThisProcessId = getpid();
+
std::vector<uint32_t> RunningPids;
DirectoryContent ProcList;
GetDirectoryContent("/proc", DirectoryContentFlags::IncludeDirs, ProcList);
for (const std::filesystem::path& EntryPath : ProcList.Directories)
{
std::string EntryName = EntryPath.stem();
- std::optional<uint32_t> Pid = ParseInt<uint32_t>(EntryName);
- if (Pid.has_value())
+ std::optional<uint32_t> PidMaybe = ParseInt<uint32_t>(EntryName);
+ if (PidMaybe.has_value())
{
- RunningPids.push_back(Pid.value());
+ if (pid_t Pid = PidMaybe.value(); IncludeSelf || (Pid != ThisProcessId))
+ {
+ RunningPids.push_back(Pid);
+ }
}
}
@@ -1042,6 +1057,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
}
}
}
+ Ec.clear();
}
return {};
#endif // ZEN_PLATFORM_LINUX
@@ -1065,10 +1081,24 @@ TEST_CASE("Process")
TEST_CASE("FindProcess")
{
- ProcessHandle Process;
- std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process);
- CHECK(!Ec);
- CHECK(Process.IsValid());
+ {
+ ProcessHandle Process;
+ std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ true);
+ CHECK(!Ec);
+ CHECK(Process.IsValid());
+ }
+ {
+ ProcessHandle Process;
+ std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ false);
+ CHECK(!Ec);
+ CHECK(!Process.IsValid());
+ }
+ {
+ ProcessHandle Process;
+ std::error_code Ec = FindProcess("this/does\\not/exist\\123914921929412312312312asdad\\12134.no", Process, /*IncludeSelf*/ false);
+ CHECK(!Ec);
+ CHECK(!Process.IsValid());
+ }
}
TEST_CASE("BuildArgV")
diff --git a/src/zencore/sentryintegration.cpp b/src/zencore/sentryintegration.cpp
index d08fb7f1d..520d5162e 100644
--- a/src/zencore/sentryintegration.cpp
+++ b/src/zencore/sentryintegration.cpp
@@ -31,6 +31,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace sentry {
+namespace {
+ static const std::string DefaultDsn("https://[email protected]/5919284");
+}
+
struct SentryAssertImpl : zen::AssertImpl
{
virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION
@@ -194,6 +198,7 @@ SentryIntegration::~SentryIntegration()
void
SentryIntegration::Initialize(std::string SentryDatabasePath,
std::string SentryAttachmentsPath,
+ std::string SentryDsn,
bool AllowPII,
const std::string& CommandLine)
{
@@ -204,7 +209,7 @@ SentryIntegration::Initialize(std::string SentryDatabasePath,
SentryDatabasePath = SentryDatabasePath.substr(4);
}
sentry_options_t* SentryOptions = sentry_options_new();
- sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284");
+ sentry_options_set_dsn(SentryOptions, SentryDsn.empty() ? sentry::DefaultDsn.c_str() : SentryDsn.c_str());
sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str());
sentry_options_set_logger(SentryOptions, SentryLogFunction, this);
if (!SentryAttachmentsPath.empty())
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index f2b26b922..a2d323b5e 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -333,7 +333,7 @@ namespace detail {
m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize));
}
m_BufferStart = Begin;
- m_BufferEnd = Min(Begin + m_BufferSize, m_FileSize);
+ m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd);
Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart);
uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart;
memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count);
diff --git a/src/zenhttp/httpclientauth.cpp b/src/zenhttp/httpclientauth.cpp
index 916b25bff..39efe1d0c 100644
--- a/src/zenhttp/httpclientauth.cpp
+++ b/src/zenhttp/httpclientauth.cpp
@@ -7,6 +7,7 @@
#include <zencore/process.h>
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
+#include <zencore/uid.h>
#include <zenhttp/auth/authmgr.h>
#include <ctime>
@@ -94,9 +95,8 @@ namespace zen { namespace httpclientauth {
CreateProcOptions ProcOptions;
- const std::string AuthTokenPath = ".zen-auth-oidctoken";
-
- auto _ = MakeGuard([AuthTokenPath]() { RemoveFile(AuthTokenPath); });
+ const std::filesystem::path AuthTokenPath(std::filesystem::temp_directory_path() / fmt::format(".zen-auth-{}", Oid::NewOid()));
+ auto _ = MakeGuard([AuthTokenPath]() { RemoveFile(AuthTokenPath); });
const std::string ProcArgs = fmt::format("{} --AuthConfigUrl {} --OutFile {} --Unattended={}",
OidcExecutablePath,
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index f7e63433b..9f2e826d6 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1588,7 +1588,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
Stopwatch Timer;
auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
@@ -1638,8 +1639,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
}
});
}
- Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
ZEN_INFO("Replayed {} of {} requests, elapsed {}",
RequestCount - PendingWork,
RequestCount,
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index e097147fc..055376b5c 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -443,6 +443,7 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("server.logid"sv, ServerOptions.LogId, "log-id"sv);
LuaOptions.AddOption("server.sentry.disable"sv, ServerOptions.NoSentry, "no-sentry"sv);
LuaOptions.AddOption("server.sentry.allowpersonalinfo"sv, ServerOptions.SentryAllowPII, "sentry-allow-personal-info"sv);
+ LuaOptions.AddOption("server.sentry.dsn"sv, ServerOptions.SentryDsn, "sentry-dsn"sv);
LuaOptions.AddOption("server.systemrootdir"sv, ServerOptions.SystemRootDir, "system-dir"sv);
LuaOptions.AddOption("server.datadir"sv, ServerOptions.DataDir, "data-dir"sv);
LuaOptions.AddOption("server.contentdir"sv, ServerOptions.ContentDir, "content-dir"sv);
@@ -762,6 +763,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("sentry-allow-personal-info",
"Allow personally identifiable information in sentry crash reports",
cxxopts::value<bool>(ServerOptions.SentryAllowPII)->default_value("false"));
+ options.add_options()("sentry-dsn", "Sentry DSN to send events to", cxxopts::value<std::string>(ServerOptions.SentryDsn));
options.add_options()("detach",
"Indicate whether zenserver should detach from parent process group",
cxxopts::value<bool>(ServerOptions.Detach)->default_value("true"));
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 1d7d22ce9..1a1793b8d 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -169,31 +169,32 @@ struct ZenServerOptions
ZenBuildStoreConfig BuildStoreConfig;
ZenStatsConfig StatsConfig;
ZenWorkspacesConfig WorksSpacesConfig;
- std::filesystem::path SystemRootDir; // System root directory (used for machine level config)
- std::filesystem::path DataDir; // Root directory for state (used for testing)
- std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
- std::filesystem::path AbsLogFile; // Absolute path to main log file
- std::filesystem::path ConfigFile; // Path to Lua config file
- std::filesystem::path PluginsConfigFile; // Path to plugins config file
- std::filesystem::path BaseSnapshotDir; // Path to server state snapshot (will be copied into data dir on start)
- std::string ChildId; // Id assigned by parent process (used for lifetime management)
- std::string LogId; // Id for tagging log output
- std::string EncryptionKey; // 256 bit AES encryption key
- std::string EncryptionIV; // 128 bit AES initialization vector
- int BasePort = 8558; // Service listen port (used for both UDP and TCP)
- int OwnerPid = 0; // Parent process id (zero for standalone)
- bool InstallService = false; // Flag used to initiate service install (temporary)
- bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
- bool IsDebug = false;
- bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not
- bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization
- bool IsTest = false;
- bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
- bool ShouldCrash = false; // Option for testing crash handling
- bool IsFirstRun = false;
- bool NoSentry = false;
- bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports
- bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux)
+ std::filesystem::path SystemRootDir; // System root directory (used for machine level config)
+ std::filesystem::path DataDir; // Root directory for state (used for testing)
+ std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
+ std::filesystem::path AbsLogFile; // Absolute path to main log file
+ std::filesystem::path ConfigFile; // Path to Lua config file
+ std::filesystem::path PluginsConfigFile; // Path to plugins config file
+ std::filesystem::path BaseSnapshotDir; // Path to server state snapshot (will be copied into data dir on start)
+ std::string ChildId; // Id assigned by parent process (used for lifetime management)
+ std::string LogId; // Id for tagging log output
+ std::string EncryptionKey; // 256 bit AES encryption key
+ std::string EncryptionIV; // 128 bit AES initialization vector
+ int BasePort = 8558; // Service listen port (used for both UDP and TCP)
+ int OwnerPid = 0; // Parent process id (zero for standalone)
+ bool InstallService = false; // Flag used to initiate service install (temporary)
+ bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
+ bool IsDebug = false;
+ bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not
+ bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization
+ bool IsTest = false;
+ bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
+ bool ShouldCrash = false; // Option for testing crash handling
+ bool IsFirstRun = false;
+ bool NoSentry = false;
+ bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports
+ std::string SentryDsn;
+ bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux)
bool ObjectStoreEnabled = false;
bool NoConsoleOutput = false; // Control default use of stdout for diagnostics
std::string Loggers[zen::logging::level::LogLevelCount];
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 0f647cd5c..868126533 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -101,7 +101,11 @@ ZenEntryPoint::Run()
std::string SentryDatabasePath = (m_ServerOptions.DataDir / ".sentry-native").string();
std::string SentryAttachmentPath = m_ServerOptions.AbsLogFile.string();
- Sentry.Initialize(SentryDatabasePath, SentryAttachmentPath, m_ServerOptions.SentryAllowPII, m_ServerOptions.CommandLine);
+ Sentry.Initialize(SentryDatabasePath,
+ SentryAttachmentPath,
+ m_ServerOptions.SentryDsn,
+ m_ServerOptions.SentryAllowPII,
+ m_ServerOptions.CommandLine);
}
#endif
try
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 3ec4373a2..6359b9db9 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1010,8 +1010,8 @@ struct ProjectStore::OplogStorage : public RefCounted
.OpCoreHash = OpData.OpCoreHash,
.OpKeyHash = OpData.KeyHash};
- m_Oplog.Append(Entry);
m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset);
+ m_Oplog.Append(Entry);
return Entry;
}
@@ -1597,7 +1597,8 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
};
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
{
if (OptionalWorkerPool)
@@ -2111,7 +2112,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
{
std::atomic_bool Result = true;
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
{
@@ -3890,7 +3892,8 @@ ProjectStore::Flush()
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (const Ref<Project>& Project : Projects)
{
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 5081ae65d..7b56c64bd 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -70,7 +70,7 @@ BlockStoreFile::Open()
return false;
}
ZEN_WARN("Failed to open cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft);
- Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms
+ Sleep(100 + (3 - RetriesLeft) * 100); // Total 600 ms
RetriesLeft--;
return true;
});
@@ -286,6 +286,14 @@ BlockStore::BlockStore()
BlockStore::~BlockStore()
{
+ try
+ {
+ Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BlockStore() failed with: ", Ex.what());
+ }
}
void
@@ -307,6 +315,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
if (IsDir(m_BlocksBasePath))
{
+ std::vector<std::filesystem::path> EmptyBlockFiles;
uint32_t NextBlockIndex = 0;
std::vector<std::filesystem::path> FoldersToScan;
FoldersToScan.push_back(m_BlocksBasePath);
@@ -334,6 +343,12 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
{
continue;
}
+ if (Entry.file_size() == 0)
+ {
+ EmptyBlockFiles.push_back(Path);
+ continue;
+ }
+
Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)};
BlockFile->Open();
m_TotalSize.fetch_add(BlockFile->TotalSize(), std::memory_order::relaxed);
@@ -347,6 +362,17 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
}
++FolderOffset;
}
+
+ for (const std::filesystem::path& EmptyBlockFile : EmptyBlockFiles)
+ {
+ std::error_code Ec;
+ RemoveFile(EmptyBlockFile, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Unable to remove empty block file {}. Reason: {}", EmptyBlockFile, Ec.message());
+ }
+ }
+
m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release);
}
else
@@ -355,7 +381,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
}
}
-void
+BlockStore::BlockIndexSet
BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
{
ZEN_MEMSCOPE(GetBlocksTag());
@@ -363,8 +389,8 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
- tsl::robin_set<uint32_t> MissingBlocks;
- tsl::robin_set<uint32_t> DeleteBlocks;
+ BlockIndexSet MissingBlocks;
+ BlockIndexSet DeleteBlocks;
DeleteBlocks.reserve(m_ChunkBlocks.size());
for (auto It : m_ChunkBlocks)
{
@@ -383,13 +409,6 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
MissingBlocks.insert(BlockIndex);
}
}
- for (std::uint32_t BlockIndex : MissingBlocks)
- {
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
- Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath));
- NewBlockFile->Create(0);
- m_ChunkBlocks[BlockIndex] = NewBlockFile;
- }
for (std::uint32_t BlockIndex : DeleteBlocks)
{
std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex);
@@ -400,6 +419,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks)
}
m_ChunkBlocks.erase(BlockIndex);
}
+ return MissingBlocks;
}
BlockStore::BlockEntryCountMap
@@ -1037,6 +1057,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
{
Continue = ChangeCallback(MovedChunks, ScrubbedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0);
DeletedSize += RemovedSize;
+ m_TotalSize.fetch_add(AddedSize);
RemovedSize = 0;
AddedSize = 0;
MovedCount += MovedChunks.size();
@@ -1220,14 +1241,13 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
NiceBytes(Space.Free + ReclaimedSpace));
}
NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
- AddedSize += WriteOffset;
+ NewBlockIndex = NextBlockIndex;
WriteOffset = 0;
TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(256u * 1024u, m_MaxBlockSize));
}
- WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment);
+ const uint64_t OldWriteOffset = WriteOffset;
+ WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment);
TargetFileBuffer->Write(ChunkView.GetData(), ChunkLocation.Size, WriteOffset);
MovedChunks.push_back(
@@ -1235,8 +1255,9 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
WriteOffset += ChunkLocation.Size;
MovedFromBlock += RoundUp(ChunkLocation.Offset + ChunkLocation.Size, PayloadAlignment) - ChunkLocation.Offset;
+ uint64_t WrittenBytes = WriteOffset - OldWriteOffset;
+ AddedSize += WrittenBytes;
}
- AddedSize += WriteOffset;
ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}",
LogPrefix,
KeepChunkIndexes.size(),
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index 41c747e08..c25f762f5 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -177,22 +177,60 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
m_Config.SmallBlobBlockStoreAlignement,
IsNew);
m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20);
+
+ BlockStore::BlockIndexSet KnownBlocks;
+ for (const BlobEntry& Blob : m_BlobEntries)
{
- BlockStore::BlockIndexSet KnownBlocks;
- for (const BlobEntry& Blob : m_BlobEntries)
+ if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
{
- if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
- {
- const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
- KnownBlocks.insert(Metadata.Location.BlockIndex);
- }
+ const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
+ KnownBlocks.insert(Metadata.Location.BlockIndex);
}
- m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
}
+ BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite);
m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite);
+ if (!MissingBlocks.empty())
+ {
+ std::vector<MetadataDiskEntry> MissingMetadatas;
+ for (auto& It : m_BlobLookup)
+ {
+ const IoHash& BlobHash = It.first;
+ const BlobIndex ReadBlobIndex = It.second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ if (ReadBlobEntry.Metadata)
+ {
+ const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata];
+ if (MissingBlocks.contains(MetaData.Location.BlockIndex))
+ {
+ MissingMetadatas.push_back(
+ MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash});
+ MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
+ m_MetadataEntries[ReadBlobEntry.Metadata] = {};
+ m_BlobEntries[ReadBlobIndex].Metadata = {};
+ }
+ }
+ }
+ ZEN_ASSERT(!MissingMetadatas.empty());
+
+ for (const MetadataDiskEntry& Entry : MissingMetadatas)
+ {
+ auto It = m_BlobLookup.find(Entry.BlobHash);
+ ZEN_ASSERT(It != m_BlobLookup.end());
+
+ const BlobIndex ReadBlobIndex = It->second;
+ const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
+ if (!ReadBlobEntry.Payload)
+ {
+ m_BlobLookup.erase(It);
+ }
+ }
+ m_MetadatalogFile.Append(MissingMetadatas);
+ CompactState();
+ }
+
m_Gc.AddGcReferencer(*this);
m_Gc.AddGcReferenceLocker(*this);
m_Gc.AddGcStorage(this);
@@ -256,34 +294,36 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
ZEN_UNUSED(Result);
Entry = PayloadEntry(0, PayloadSize);
}
- m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
- const BlobIndex ExistingBlobIndex = It->second;
- BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
- if (Blob.Payload)
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
- m_PayloadEntries[Blob.Payload] = Entry;
+ const BlobIndex ExistingBlobIndex = It->second;
+ BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ if (Blob.Payload)
+ {
+ m_PayloadEntries[Blob.Payload] = Entry;
+ }
+ else
+ {
+ Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ m_PayloadEntries.push_back(Entry);
+ }
+ Blob.LastAccessTime = GcClock::TickCount();
}
else
{
- Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
+ PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
m_PayloadEntries.push_back(Entry);
- }
- Blob.LastAccessTime = GcClock::TickCount();
- }
- else
- {
- PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
- m_PayloadEntries.push_back(Entry);
- const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
- // we only remove during GC and compact this then...
- m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
- m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size()));
+ // we only remove during GC and compact this then...
+ m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
+ m_BlobLookup.insert({BlobHash, NewBlobIndex});
+ }
}
+ m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
m_LastAccessTimeUpdateCount++;
}
@@ -370,7 +410,6 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
const BlockStoreLocation& Location = Locations[LocationIndex];
MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0};
- m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
@@ -396,6 +435,9 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
m_BlobLookup.insert({BlobHash, NewBlobIndex});
}
+
+ m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
+
m_LastAccessTimeUpdateCount++;
WriteBlobIndex++;
if (m_TrackedCacheKeys)
@@ -483,7 +525,8 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
if (!MetaLocations.empty())
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
m_MetadataBlockStore.IterateChunks(
MetaLocations,
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index e973cee77..0ee70890c 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -751,6 +751,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc,
ZenCacheDiskLayer::CacheBucket::~CacheBucket()
{
+ try
+ {
+ m_SlogFile.Flush();
+ m_SlogFile.Close();
+ m_BlockStore.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CacheBucket() failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferencer(*this);
}
@@ -824,11 +834,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
void
-ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc)
+ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition,
+ bool ResetLog,
+ const std::function<uint64_t()>& ClaimDiskReserveFunc)
{
ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot");
- if (m_LogFlushPosition == m_SlogFile.GetLogCount())
+ if (m_LogFlushPosition == LogPosition)
{
return;
}
@@ -877,7 +889,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st
throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir));
}
- const uint64_t IndexLogPosition = ResetLog ? 0 : m_SlogFile.GetLogCount();
+ const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition;
cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount,
.LogPosition = IndexLogPosition,
@@ -930,12 +942,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st
if (IsFile(LogPath))
{
+ m_SlogFile.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
@@ -1149,13 +1163,6 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
}
}
- if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0)
- {
- WriteIndexSnapshot(IndexLock, /*Flush log*/ true);
- }
-
- m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_Index)
{
@@ -1173,7 +1180,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco
KnownBlocks.insert(BlockIndex);
}
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<DiskIndexEntry> MissingEntries;
+
+ for (auto& It : m_Index)
+ {
+ BucketPayload& Payload = m_Payloads[It.second];
+ DiskLocation Location = Payload.Location;
+ if (!Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex()))
+ {
+ RemoveMemCachedData(IndexLock, Payload);
+ RemoveMetaData(IndexLock, Payload);
+ }
+ }
+ Location.Flags |= DiskLocation::kTombStone;
+ MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location});
+ }
+
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const DiskIndexEntry& Entry : MissingEntries)
+ {
+ m_Index.erase(Entry.Key);
+ }
+ m_SlogFile.Append(MissingEntries);
+ m_SlogFile.Flush();
+ {
+ std::vector<BucketPayload> Payloads;
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketMetaData> MetaDatas;
+ std::vector<MemCacheData> MemCachedPayloads;
+ IndexMap Index;
+ CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index);
+ }
+ RemovedEntries = true;
+ }
+
+ if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries)
+ {
+ WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true);
+ }
}
void
@@ -2024,6 +2077,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot");
try
{
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t LogPosition = m_SlogFile.GetLogCount();
+
bool UseLegacyScheme = false;
IoBuffer Buffer;
@@ -2038,7 +2094,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
// Note: this copy could be eliminated on shutdown to
// reduce memory usage and execution time
Index = m_Index;
@@ -2078,7 +2134,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl
else
{
RwLock::SharedLockScope IndexLock(m_IndexLock);
- WriteIndexSnapshot(IndexLock, /*Flush log*/ false);
+ WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false);
const uint64_t EntryCount = m_Index.size();
Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount);
uint64_t SidecarSize = ManifestWriter.GetSidecarSize();
@@ -2727,7 +2783,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
ZEN_MEMSCOPE(GetCacheDiskTag());
ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation");
DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags);
- m_SlogFile.Append({.Key = HashKey, .Location = Location});
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
if (m_TrackedCacheKeys)
@@ -2757,6 +2812,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
m_AccessTimes.emplace_back(GcClock::TickCount());
m_Index.insert_or_assign(HashKey, EntryIndex);
}
+ m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
}
@@ -3978,7 +4034,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (auto& BucketPath : FoundBucketDirectories)
{
Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
@@ -4141,7 +4198,8 @@ ZenCacheDiskLayer::Flush()
{
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
try
{
for (auto& Bucket : Buckets)
@@ -4164,8 +4222,10 @@ ZenCacheDiskLayer::Flush()
{
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
- Work.Wait(1000,
- [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); });
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
+ ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir);
+ });
}
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 75562176e..2ab5752ff 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -145,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get("
CasContainerStrategy::~CasContainerStrategy()
{
+ try
+ {
+ m_BlockStore.Close();
+ m_CasLog.Flush();
+ m_CasLog.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what());
+ }
m_Gc.RemoveGcReferenceStore(*this);
m_Gc.RemoveGcStorage(this);
}
@@ -204,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
ZEN_TRACE_CPU("CasContainer::UpdateLocation");
BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
m_LocationMap.emplace(ChunkHash, m_Locations.size());
m_Locations.push_back(DiskLocation);
}
+ m_CasLog.Append(IndexEntry);
});
return CasStore::InsertResult{.New = true};
@@ -273,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c
IndexEntries.emplace_back(
CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)});
}
- m_CasLog.Append(IndexEntries);
{
RwLock::ExclusiveLockScope _(m_LocationMapLock);
for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
@@ -282,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c
m_Locations.push_back(DiskIndexEntry.Location);
}
}
+ m_CasLog.Append(IndexEntries);
});
return Result;
}
@@ -402,7 +412,8 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
std::atomic<bool> AsyncContinue = true;
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
const bool Continue = m_BlockStore.IterateChunks(
FoundChunkLocations,
[this,
@@ -745,19 +756,27 @@ public:
MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location});
}
}
- for (size_t ScrubbedIndex : ScrubbedArray)
+ for (size_t ChunkIndex : ScrubbedArray)
{
- const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex];
+ const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key);
It != m_CasContainerStrategy.m_LocationMap.end())
{
- BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second];
+ const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex);
+ if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation)
+ {
+ // Someone has moved our chunk so lets just skip the new location we were provided, it will be
+ // GC:d at a later time
+ continue;
+ }
MovedEntries.push_back(
CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone});
m_CasContainerStrategy.m_LocationMap.erase(It);
}
}
m_CasContainerStrategy.m_CasLog.Append(MovedEntries);
+ m_CasContainerStrategy.m_CasLog.Flush();
Stats.RemovedDisk += FreedDiskSpace;
if (Ctx.IsCancelledFlag.load())
{
@@ -983,14 +1002,11 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog)
{
// Write the current state of the location map to a new index state
std::vector<CasDiskIndexEntry> Entries;
- uint64_t IndexLogPosition = 0;
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount();
{
RwLock::SharedLockScope ___(m_LocationMapLock);
- if (!ResetLog)
- {
- IndexLogPosition = m_CasLog.GetLogCount();
- }
Entries.resize(m_LocationMap.size());
uint64_t EntryIndex = 0;
@@ -1035,12 +1051,14 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog)
if (IsFile(LogPath))
{
+ m_CasLog.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
@@ -1153,7 +1171,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
SkipEntryCount = 0;
}
- LogEntryCount = EntryCount - SkipEntryCount;
+ LogEntryCount = SkipEntryCount;
CasLog.Replay(
[&](const CasDiskIndexEntry& Record) {
LogEntryCount++;
@@ -1172,7 +1190,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski
m_Locations.push_back(Record.Location);
},
SkipEntryCount);
-
return LogEntryCount;
}
return 0;
@@ -1228,8 +1245,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
}
- m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
BlockStore::BlockIndexSet KnownBlocks;
for (const auto& Entry : m_LocationMap)
@@ -1239,9 +1254,39 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
KnownBlocks.insert(BlockIndex);
}
- m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+ BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
+
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ bool RemovedEntries = false;
+ if (!MissingBlocks.empty())
+ {
+ std::vector<CasDiskIndexEntry> MissingEntries;
+ for (auto& It : m_LocationMap)
+ {
+ const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex();
+ if (MissingBlocks.contains(BlockIndex))
+ {
+ MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone});
+ }
+ }
+ ZEN_ASSERT(!MissingEntries.empty());
+
+ for (const CasDiskIndexEntry& Entry : MissingEntries)
+ {
+ m_LocationMap.erase(Entry.Key);
+ }
+ m_CasLog.Append(MissingEntries);
+ m_CasLog.Flush();
+
+ {
+ RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock);
+ CompactIndex(IndexLock);
+ }
+ RemovedEntries = true;
+ }
- if (IsNewStore || (LogEntryCount > 0))
+ if (IsNewStore || (LogEntryCount > 0) || RemovedEntries)
{
MakeIndexSnapshot(/*ResetLog*/ true);
}
@@ -1611,6 +1656,236 @@ TEST_CASE("compactcas.threadedinsert")
}
}
+TEST_CASE("compactcas.restart")
+{
+ uint64_t ExpectedSize = 0;
+
+ auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
+ WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+
+ Latch WorkLatch(1);
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup;
+ ChunkHashesLookup.reserve(ChunkCount);
+ RwLock InsertLock;
+ for (size_t Offset = 0; Offset < ChunkCount;)
+ {
+ size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u);
+ WorkLatch.AddCount(1);
+ ThreadPool.ScheduleWork(
+ [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ std::vector<IoBuffer> BatchBlobs;
+ std::vector<IoHash> BatchHashes;
+ BatchBlobs.reserve(BatchCount);
+ BatchHashes.reserve(BatchCount);
+
+ while (BatchBlobs.size() < BatchCount)
+ {
+ IoBuffer Chunk =
+ CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377));
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ if (ChunkHashesLookup.contains(Hash))
+ {
+ continue;
+ }
+ ChunkHashesLookup.insert(Hash);
+ ExpectedSize += Chunk.Size();
+ }
+
+ BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer());
+ BatchHashes.push_back(Hash);
+ }
+
+ Cas.InsertChunks(BatchBlobs, BatchHashes);
+ {
+ RwLock::ExclusiveLockScope __(InsertLock);
+ Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end());
+ }
+ });
+ Offset += BatchCount;
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path CasPath = TempDir.Path();
+ CreateDirectories(CasPath);
+
+ bool Generate = false;
+ if (!Generate)
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ }
+
+ const uint64_t kChunkSize = 1048 + 395;
+ const size_t kChunkCount = 7167;
+
+ std::vector<IoHash> Hashes;
+ Hashes.reserve(kChunkCount);
+
+ auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) {
+ for (const IoHash& Hash : Hashes)
+ {
+ if (ShouldExist)
+ {
+ CHECK(Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(Buffer);
+ IoHash ValidateHash;
+ uint64_t ValidateRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize);
+ CHECK(Compressed);
+ CHECK(ValidateHash == Hash);
+ }
+ else
+ {
+ CHECK(!Cas.HaveChunk(Hash));
+ IoBuffer Buffer = Cas.FindChunk(Hash);
+ CHECK(!Buffer);
+ }
+ }
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ class GcRefChecker : public GcReferenceChecker
+ {
+ public:
+ explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {}
+ ~GcRefChecker() {}
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); }
+ void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
+ std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
+ {
+ ZEN_UNUSED(Ctx);
+ return KeepUnusedReferences(m_HashesToKeep, IoCids);
+ }
+
+ private:
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ class GcRef : public GcReferencer
+ {
+ public:
+ GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc)
+ {
+ m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end());
+ m_Gc.AddGcReferencer(*this);
+ }
+ ~GcRef() { m_Gc.RemoveGcReferencer(*this); }
+ std::string GetGcName(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return "test";
+ }
+ GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override
+ {
+ ZEN_UNUSED(Ctx, Stats);
+ return nullptr;
+ }
+ std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {new GcRefChecker(std::move(m_HashesToKeep))};
+ }
+ std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override
+ {
+ ZEN_UNUSED(Ctx);
+ return {};
+ }
+
+ private:
+ GcManager& m_Gc;
+ std::vector<IoHash> m_HashesToKeep;
+ };
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes);
+ ValidateChunks(Cas, Hashes, true);
+ if (true)
+ {
+ std::vector<IoHash> DropHashes;
+ std::vector<IoHash> KeepHashes;
+ for (size_t Index = 0; Index < Hashes.size(); Index++)
+ {
+ if (Index % 5 == 0)
+ {
+ KeepHashes.push_back(Hashes[Index]);
+ }
+ else
+ {
+ DropHashes.push_back(Hashes[Index]);
+ }
+ }
+ // std::span<const IoHash> KeepHashes(Hashes);
+ // ZEN_ASSERT(ExpectedGcCount < Hashes.size());
+ // KeepHashes = KeepHashes.subspan(ExpectedGcCount);
+ GcRef Ref(Gc, KeepHashes);
+ Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true});
+ ValidateChunks(Cas, KeepHashes, true);
+ ValidateChunks(Cas, DropHashes, false);
+ Hashes = KeepHashes;
+ }
+ GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ Cas.Flush();
+ ValidateChunks(Cas, Hashes, true);
+ }
+
+ {
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(CasPath, "test", 65536 * 128, 8, false);
+ ValidateChunks(Cas, Hashes, true);
+ }
+}
+
TEST_CASE("compactcas.iteratechunks")
{
std::atomic<size_t> WorkCompleted = 0;
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 56979f267..11a266f1c 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -664,7 +664,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
};
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
if (!AsyncContinue)
@@ -968,14 +969,11 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog)
{
// Write the current state of the location map to a new index state
std::vector<FileCasIndexEntry> Entries;
- uint64_t IndexLogPosition = 0;
+ // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock
+ const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount();
{
RwLock::SharedLockScope __(m_Lock);
- if (!ResetLog)
- {
- IndexLogPosition = m_CasLog.GetLogCount();
- }
Entries.resize(m_Index.size());
uint64_t EntryIndex = 0;
@@ -1018,12 +1016,14 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog)
if (IsFile(LogPath))
{
+ m_CasLog.Close();
if (!RemoveFile(LogPath, Ec) || Ec)
{
// This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in
// the end it will be the same result
ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message());
}
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
}
}
m_LogFlushPosition = IndexLogPosition;
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 8cbcad11b..fce05766f 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -147,10 +147,10 @@ public:
typedef tsl::robin_set<uint32_t> BlockIndexSet;
- // Ask the store to create empty blocks for all locations that does not have a block
// Remove any block that is not referenced
- void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks);
- BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent);
+ // Return a list of blocks that are not present
+ [[nodiscard]] BlockIndexSet SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks);
+ BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent);
void Close();
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index b67a043df..3cd2d6423 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -409,19 +409,22 @@ public:
void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });
void WriteIndexSnapshot(
RwLock::ExclusiveLockScope&,
+ uint64_t LogPosition,
bool ResetLog,
const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
{
- WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc);
+ WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc);
}
void WriteIndexSnapshot(
RwLock::SharedLockScope&,
+ uint64_t LogPosition,
bool ResetLog,
const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; })
{
- WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc);
+ WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc);
}
void WriteIndexSnapshotLocked(
+ uint64_t LogPosition,
bool ResetLog,
const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; });
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index c7532e098..cd1bf7dd7 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -758,14 +758,15 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span
}
ChunkedFolderContent
-ChunkFolderContent(ChunkingStatistics& Stats,
- WorkerThreadPool& WorkerPool,
- const std::filesystem::path& RootPath,
- const FolderContent& Content,
- const ChunkingController& InChunkingController,
- int32_t UpdateIntervalMS,
- std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
- std::atomic<bool>& AbortFlag)
+ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateIntervalMS,
+ std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag)
{
ZEN_TRACE_CPU("ChunkFolderContent");
@@ -804,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RwLock Lock;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t PathIndex : Order)
{
@@ -831,10 +832,9 @@ ChunkFolderContent(ChunkingStatistics& Stats,
});
}
- Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.Wait(UpdateIntervalMS, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(PendingWork);
- UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining());
+ UpdateCallback(IsAborted, IsPaused, Work.PendingWork().Remaining());
});
}
return Result;
diff --git a/src/zenutil/commandlineoptions.cpp b/src/zenutil/commandlineoptions.cpp
index 0dffa42f0..afef7f6f2 100644
--- a/src/zenutil/commandlineoptions.cpp
+++ b/src/zenutil/commandlineoptions.cpp
@@ -157,12 +157,7 @@ MakeSafeAbsolutePath(const std::filesystem::path& Path)
std::filesystem::path
StringToPath(const std::string_view& Path)
{
- std::string_view UnquotedPath = Path;
-
- if (UnquotedPath.length() > 2 && UnquotedPath.front() == '\"' && UnquotedPath.back() == '\"')
- {
- UnquotedPath = UnquotedPath.substr(1, UnquotedPath.length() - 2);
- }
+ std::string_view UnquotedPath = RemoveQuotes(Path);
if (UnquotedPath.ends_with('/') || UnquotedPath.ends_with('\\') || UnquotedPath.ends_with(std::filesystem::path::preferred_separator))
{
@@ -172,6 +167,19 @@ StringToPath(const std::string_view& Path)
return std::filesystem::path(UnquotedPath).make_preferred();
}
+std::string_view
+RemoveQuotes(const std::string_view& Arg)
+{
+ if (Arg.length() > 2)
+ {
+ if (Arg[0] == '"' && Arg[Arg.length() - 1] == '"')
+ {
+ return Arg.substr(1, Arg.length() - 2);
+ }
+ }
+ return Arg;
+}
+
#if ZEN_WITH_TESTS
void
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 225b1a3a5..306a5d990 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -132,14 +132,15 @@ struct ChunkingStatistics
uint64_t ElapsedWallTimeUS = 0;
};
-ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
- WorkerThreadPool& WorkerPool,
- const std::filesystem::path& RootPath,
- const FolderContent& Content,
- const ChunkingController& InChunkingController,
- int32_t UpdateIntervalMS,
- std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
- std::atomic<bool>& AbortFlag);
+ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateIntervalMS,
+ std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag);
ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content);
diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h
index b7581f6cd..f927d41e5 100644
--- a/src/zenutil/include/zenutil/commandlineoptions.h
+++ b/src/zenutil/include/zenutil/commandlineoptions.h
@@ -22,6 +22,7 @@ std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArg
void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path);
[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path);
std::filesystem::path StringToPath(const std::string_view& Path);
+std::string_view RemoveQuotes(const std::string_view& Arg);
void commandlineoptions_forcelink(); // internal
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
index 08e730b28..d7e986551 100644
--- a/src/zenutil/include/zenutil/parallelwork.h
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -12,13 +12,13 @@ namespace zen {
class ParallelWork
{
public:
- ParallelWork(std::atomic<bool>& AbortFlag);
+ ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
~ParallelWork();
- typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
- typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
- typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback;
+ typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
+ typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
+ typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback;
void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
{
@@ -28,6 +28,10 @@ public:
WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
try
{
+ while (m_PauseFlag && !m_AbortFlag)
+ {
+ Sleep(2000);
+ }
Work(m_AbortFlag);
}
catch (...)
@@ -59,6 +63,7 @@ private:
void RethrowErrors();
std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
bool m_DispatchComplete = false;
Latch m_PendingWork;
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index 01a703a1b..1fd59acdf 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -379,9 +379,10 @@ JupiterResult
JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId),
- Payload,
- {HttpClient::Accept(ZenContentType::kCbObject)});
+ std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId);
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
}
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index ecacc4b5a..67fc03c04 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -14,7 +14,10 @@
namespace zen {
-ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1)
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
+: m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_PendingWork(1)
{
}
@@ -59,7 +62,7 @@ ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
while (!m_PendingWork.Wait(UpdateIntervalMS))
{
- UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
+ UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining());
}
RethrowErrors();
@@ -111,7 +114,8 @@ ParallelWork::RethrowErrors()
TEST_CASE("parallellwork.nowork")
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
Work.Wait();
}
@@ -120,7 +124,8 @@ TEST_CASE("parallellwork.basic")
WorkerThreadPool WorkerPool(2);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
@@ -133,7 +138,8 @@ TEST_CASE("parallellwork.throws_in_work")
WorkerThreadPool WorkerPool(2);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t I = 0; I < 10; I++)
{
Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
@@ -158,7 +164,8 @@ TEST_CASE("parallellwork.throws_in_dispatch")
try
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index bfa0d3c49..a5b342cb0 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -1059,7 +1059,7 @@ ZenServerInstance::Terminate()
const std::filesystem::path BaseDir = m_Env.ProgramBaseDir();
const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
ProcessHandle RunningProcess;
- std::error_code Ec = FindProcess(Executable, RunningProcess);
+ std::error_code Ec = FindProcess(Executable, RunningProcess, /*IncludeSelf*/ false);
if (Ec)
{
throw std::system_error(Ec, fmt::format("failed to look up running server executable '{}'", Executable));