aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md21
-rw-r--r--VERSION.txt2
-rw-r--r--src/zen/authutils.cpp4
-rw-r--r--src/zen/cmds/builds_cmd.cpp394
-rw-r--r--src/zen/cmds/builds_cmd.h11
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp32
-rw-r--r--src/zen/zen.cpp6
-rw-r--r--src/zencore-test/zencore-test.cpp4
-rw-r--r--src/zencore/commandline.cpp4
-rw-r--r--src/zencore/filesystem.cpp63
-rw-r--r--src/zencore/include/zencore/filesystem.h5
-rw-r--r--src/zencore/include/zencore/string.h53
-rw-r--r--src/zencore/string.cpp36
-rw-r--r--src/zenhttp-test/zenhttp-test.cpp4
-rw-r--r--src/zenhttp/clients/httpclientcpr.cpp8
-rw-r--r--src/zenhttp/clients/httpclientcpr.h2
-rw-r--r--src/zenhttp/servers/httpasio.cpp742
-rw-r--r--src/zennet-test/zennet-test.cpp4
-rw-r--r--src/zenremotestore-test/zenremotestore-test.cpp4
-rw-r--r--src/zenremotestore/builds/buildsavedstate.cpp9
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp246
-rw-r--r--src/zenremotestore/builds/buildstorageutil.cpp15
-rw-r--r--src/zenremotestore/chunking/chunkedcontent.cpp2
-rw-r--r--src/zenremotestore/filesystemutils.cpp367
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h12
-rw-r--r--src/zenremotestore/include/zenremotestore/filesystemutils.h2
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h2
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp173
-rw-r--r--src/zenremotestore/zenremotestore.cpp2
-rw-r--r--src/zenserver-test/zenserver-test.cpp4
-rw-r--r--src/zenserver/diag/diagsvcs.cpp29
-rw-r--r--src/zenserver/frontend/html.zipbin163145 -> 163147 bytes
-rw-r--r--src/zenserver/frontend/html/util/component.js2
-rw-r--r--src/zenserver/main.cpp8
-rw-r--r--src/zenserver/zenserver.cpp4
-rw-r--r--src/zenstore-test/zenstore-test.cpp4
-rw-r--r--src/zenstore/blockstore.cpp24
-rw-r--r--src/zenstore/compactcas.cpp10
-rw-r--r--src/zenstore/projectstore.cpp2
-rw-r--r--src/zentelemetry-test/zentelemetry-test.cpp4
-rw-r--r--src/zenutil-test/zenutil-test.cpp4
-rw-r--r--src/zenutil/include/zenutil/logging/rotatingfilesink.h3
42 files changed, 1683 insertions, 644 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0eb1574fe..47ef6f3e1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,28 @@
##
+- Feature: Added `--exclude-folders` to `zen upload`, `zen download` and `zen diff` to extend the default exclude folders. Each folder name is separated with ';' or ','
+ - The default folders that are always excluded are:
+ - `.unsync`
+ - `.zen`
+ - `.ugs`
+ - `.zen-tmp`
+- Feature: Added `--exclude-extensions` to `zen upload` and `zen diff` to set the file extensions to exclude. Each extension name is separated with ';' or ','
+- Feature: Added `--result-path` option to `zen builds ls` to output structured to a file
+- Improvement: On Windows, the asio HTTP back-end now uses TransmitFile to send data directly from files instead of using memory mapping.
- Improvement: Optimized scavenge lookup performance
- Improvement: Enable limit-overwrite behavior by default
+- Improvement: Validate chunk hashes when dechunking files in oplog import
+- Improvement: Use stream decompression when dechunking files
+- Improvement: When assembling blocks for oplog export, make sure we keep under/at block size limit
+- Improvement: Make cancelling of oplog import more responsive
+- Improvement: Use decompress to composite to avoid allocating a new memory buffer for uncompressed chunks during oplog import
+- Improvement: Reduce memory buffer size and allocate it on demand when writing multiple chunks to block store
+- Improvement: Reduce lock contention when fetching/checking existence of chunks in block store
+- Improvement: If we fail to create the server mutex, gracefully report the problem and exit without sending error to Sentry
+- Improvement: Excluded folder names are now matched by folder name in subfolders in addition to root level folders
+- Improvement: If the path given in `--oidctoken-exe-path` for zen commands is invalid, exit with error instead of ignoring the argument
- Bugfix: Upstream propagation of Put operations would not retain the overwrite cache policy if it was used
+- Bugfix: `zen oplog-download` failed to initialize build part id causing it to fail to download the build part
+- Bugfix: Windows: Set up UTF8 as current locale to properly handle non-ascii characters
## 5.7.15
- Feature: `zen oplog-export`, `zen oplog-import` and `zen oplog-download` now has options to boost workers
diff --git a/VERSION.txt b/VERSION.txt
index dcfab7848..d6542901d 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-5.7.15 \ No newline at end of file
+5.7.16 \ No newline at end of file
diff --git a/src/zen/authutils.cpp b/src/zen/authutils.cpp
index fdcb8e15d..31db82efd 100644
--- a/src/zen/authutils.cpp
+++ b/src/zen/authutils.cpp
@@ -283,6 +283,10 @@ AuthCommandLineOptions::ParseOptions(cxxopts::Options& Ops,
ClientSettings.AccessTokenProvider =
httpclientauth::CreateFromOidcTokenExecutable(OidcTokenExePath, HostUrl, Quiet, m_OidcTokenUnattended, Hidden);
}
+ else if (!m_OidcTokenAuthExecutablePath.empty())
+ {
+ throw OptionParseException(fmt::format("'--oidctoken-exe-path' ('{}') does not exist", m_OidcTokenAuthExecutablePath), Ops.help());
+ }
if (!ClientSettings.AccessTokenProvider)
{
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 25f66e0ee..d7980cc24 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -427,22 +427,24 @@ namespace {
NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000));
}
- void UploadFolder(OperationLogOutput& Output,
- TransferThreadWorkers& Workers,
- StorageInstance& Storage,
- const Oid& BuildId,
- const Oid& BuildPartId,
- const std::string_view BuildPartName,
- const std::filesystem::path& Path,
- const std::filesystem::path& TempDir,
- const std::filesystem::path& ManifestPath,
- const uint64_t FindBlockMaxCount,
- const uint8_t BlockReuseMinPercentLimit,
- bool AllowMultiparts,
- const CbObject& MetaData,
- bool CreateBuild,
- bool IgnoreExistingBlocks,
- bool UploadToZenCache)
+ void UploadFolder(OperationLogOutput& Output,
+ TransferThreadWorkers& Workers,
+ StorageInstance& Storage,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ const std::string_view BuildPartName,
+ const std::filesystem::path& Path,
+ const std::filesystem::path& TempDir,
+ const std::filesystem::path& ManifestPath,
+ const uint64_t FindBlockMaxCount,
+ const uint8_t BlockReuseMinPercentLimit,
+ bool AllowMultiparts,
+ const CbObject& MetaData,
+ bool CreateBuild,
+ bool IgnoreExistingBlocks,
+ bool UploadToZenCache,
+ const std::vector<std::string>& ExcludeFolders,
+ const std::vector<std::string>& ExcludeExtensions)
{
ProgressBar::SetLogOperationName(ProgressMode, "Upload Folder");
{
@@ -470,8 +472,8 @@ namespace {
.AllowMultiparts = AllowMultiparts,
.IgnoreExistingBlocks = IgnoreExistingBlocks,
.TempDir = TempDir,
- .ExcludeFolders = DefaultExcludeFolders,
- .ExcludeExtensions = DefaultExcludeExtensions,
+ .ExcludeFolders = ExcludeFolders,
+ .ExcludeExtensions = ExcludeExtensions,
.ZenExcludeManifestName = ZenExcludeManifestName,
.NonCompressableExtensions = DefaultSplitOnlyExtensions,
.PopulateCache = UploadToZenCache});
@@ -682,12 +684,13 @@ namespace {
uint64_t VerifyElapsedWallTimeUs = 0;
};
- void VerifyFolder(TransferThreadWorkers& Workers,
- const ChunkedFolderContent& Content,
- const ChunkedContentLookup& Lookup,
- const std::filesystem::path& Path,
- bool VerifyFileHash,
- VerifyFolderStatistics& VerifyFolderStats)
+ void VerifyFolder(TransferThreadWorkers& Workers,
+ const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::filesystem::path& Path,
+ const std::vector<std::string>& ExcludeFolders,
+ bool VerifyFileHash,
+ VerifyFolderStatistics& VerifyFolderStats)
{
ZEN_TRACE_CPU("VerifyFolder");
@@ -704,7 +707,7 @@ namespace {
RwLock ErrorLock;
std::vector<std::string> Errors;
- auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool {
+ auto IsAcceptedFolder = [ExcludeFolders = ExcludeFolders](const std::string_view& RelativePath) -> bool {
for (const std::string& ExcludeFolder : ExcludeFolders)
{
if (RelativePath.starts_with(ExcludeFolder))
@@ -1593,6 +1596,7 @@ namespace {
uint64_t MaximumInMemoryPayloadSize = 512u * 1024u;
bool PopulateCache = true;
bool AppendNewContent = false;
+ std::vector<std::string> ExcludeFolders;
};
void DownloadFolder(OperationLogOutput& Output,
@@ -1834,8 +1838,7 @@ namespace {
.EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
.EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent,
.ValidateCompletedSequences = Options.PostDownloadVerify,
- .ExcludeFolders = DefaultExcludeFolders,
- .ExcludeExtensions = DefaultExcludeExtensions,
+ .ExcludeFolders = Options.ExcludeFolders,
.MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize,
.PopulateCache = Options.PopulateCache});
{
@@ -1861,7 +1864,13 @@ namespace {
ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Verify, TaskSteps::StepCount);
- VerifyFolder(Workers, RemoteContent, RemoteLookup, Path, Options.PostDownloadVerify, VerifyFolderStats);
+ VerifyFolder(Workers,
+ RemoteContent,
+ RemoteLookup,
+ Path,
+ Options.ExcludeFolders,
+ Options.PostDownloadVerify,
+ VerifyFolderStats);
Stopwatch WriteStateTimer;
CbObject StateObject = CreateBuildSaveStateObject(LocalState);
@@ -1999,88 +2008,157 @@ namespace {
const std::vector<Oid>& BuildPartIds,
std::span<const std::string> BuildPartNames,
std::span<const std::string> IncludeWildcards,
- std::span<const std::string> ExcludeWildcards)
+ std::span<const std::string> ExcludeWildcards,
+ CbObjectWriter* OptionalStructuredOutput)
{
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
CbObject BuildObject = GetBuild(*Storage.BuildStorage, BuildId);
+ if (OptionalStructuredOutput != nullptr)
+ {
+ OptionalStructuredOutput->AddObjectId("buildId"sv, BuildId);
+ OptionalStructuredOutput->AddObject("build"sv, BuildObject);
+ }
+
std::vector<std::pair<Oid, std::string>> AllBuildParts =
ResolveBuildPartNames(BuildObject, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
- Stopwatch GetBuildPartTimer;
-
- for (size_t BuildPartIndex = 0; BuildPartIndex < AllBuildParts.size(); BuildPartIndex++)
+ if (!AllBuildParts.empty())
{
- const Oid BuildPartId = AllBuildParts[BuildPartIndex].first;
- const std::string_view BuildPartName = AllBuildParts[BuildPartIndex].second;
- CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId);
+ Stopwatch GetBuildPartTimer;
- if (!IsQuiet)
+ if (OptionalStructuredOutput != nullptr)
{
- ZEN_CONSOLE("{}Part: {} ('{}'):\n",
- BuildPartIndex > 0 ? "\n" : "",
- BuildPartId,
- BuildPartName,
- NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()),
- NiceBytes(BuildPartManifest.GetSize()));
+ OptionalStructuredOutput->BeginArray("parts"sv);
}
- std::vector<std::filesystem::path> Paths;
- std::vector<IoHash> RawHashes;
- std::vector<uint64_t> RawSizes;
- std::vector<uint32_t> Attributes;
-
- SourcePlatform Platform;
- std::vector<IoHash> SequenceRawHashes;
- std::vector<uint32_t> ChunkCounts;
- std::vector<uint32_t> AbsoluteChunkOrders;
- std::vector<IoHash> LooseChunkHashes;
- std::vector<uint64_t> LooseChunkRawSizes;
- std::vector<IoHash> BlockRawHashes;
+ for (size_t BuildPartIndex = 0; BuildPartIndex < AllBuildParts.size(); BuildPartIndex++)
+ {
+ const Oid BuildPartId = AllBuildParts[BuildPartIndex].first;
+ const std::string_view BuildPartName = AllBuildParts[BuildPartIndex].second;
+ CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId);
- ReadBuildContentFromCompactBinary(BuildPartManifest,
- Platform,
- Paths,
- RawHashes,
- RawSizes,
- Attributes,
- SequenceRawHashes,
- ChunkCounts,
- AbsoluteChunkOrders,
- LooseChunkHashes,
- LooseChunkRawSizes,
- BlockRawHashes);
+ if (OptionalStructuredOutput != nullptr)
+ {
+ OptionalStructuredOutput->BeginObject();
+ OptionalStructuredOutput->AddObjectId("id"sv, BuildPartId);
+ OptionalStructuredOutput->AddString("partName"sv, BuildPartName);
+ }
+ {
+ if (OptionalStructuredOutput != nullptr)
+ {
+ }
+ else if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}Part: {} ('{}'):\n",
+ BuildPartIndex > 0 ? "\n" : "",
+ BuildPartId,
+ BuildPartName,
+ NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()),
+ NiceBytes(BuildPartManifest.GetSize()));
+ }
- std::vector<size_t> Order(Paths.size());
- std::iota(Order.begin(), Order.end(), 0);
+ std::vector<std::filesystem::path> Paths;
+ std::vector<IoHash> RawHashes;
+ std::vector<uint64_t> RawSizes;
+ std::vector<uint32_t> Attributes;
+
+ SourcePlatform Platform;
+ std::vector<IoHash> SequenceRawHashes;
+ std::vector<uint32_t> ChunkCounts;
+ std::vector<uint32_t> AbsoluteChunkOrders;
+ std::vector<IoHash> LooseChunkHashes;
+ std::vector<uint64_t> LooseChunkRawSizes;
+ std::vector<IoHash> BlockRawHashes;
+
+ ReadBuildContentFromCompactBinary(BuildPartManifest,
+ Platform,
+ Paths,
+ RawHashes,
+ RawSizes,
+ Attributes,
+ SequenceRawHashes,
+ ChunkCounts,
+ AbsoluteChunkOrders,
+ LooseChunkHashes,
+ LooseChunkRawSizes,
+ BlockRawHashes);
+
+ std::vector<size_t> Order(Paths.size());
+ std::iota(Order.begin(), Order.end(), 0);
+
+ std::sort(Order.begin(), Order.end(), [&](size_t Lhs, size_t Rhs) {
+ const std::filesystem::path& LhsPath = Paths[Lhs];
+ const std::filesystem::path& RhsPath = Paths[Rhs];
+ return LhsPath < RhsPath;
+ });
- std::sort(Order.begin(), Order.end(), [&](size_t Lhs, size_t Rhs) {
- const std::filesystem::path& LhsPath = Paths[Lhs];
- const std::filesystem::path& RhsPath = Paths[Rhs];
- return LhsPath < RhsPath;
- });
+ if (OptionalStructuredOutput != nullptr)
+ {
+ OptionalStructuredOutput->BeginArray("files"sv);
+ }
+ {
+ for (size_t Index : Order)
+ {
+ const std::filesystem::path& Path = Paths[Index];
+ if (IncludePath(IncludeWildcards, ExcludeWildcards, Path))
+ {
+ const IoHash& RawHash = RawHashes[Index];
+ const uint64_t RawSize = RawSizes[Index];
+ const uint32_t Attribute = Attributes[Index];
- for (size_t Index : Order)
- {
- const std::filesystem::path& Path = Paths[Index];
- if (IncludePath(IncludeWildcards, ExcludeWildcards, Path))
+ if (OptionalStructuredOutput != nullptr)
+ {
+ OptionalStructuredOutput->BeginObject();
+ {
+ OptionalStructuredOutput->AddString("path"sv, fmt::format("{}", Path));
+ OptionalStructuredOutput->AddInteger("rawSize"sv, RawSize);
+ switch (Platform)
+ {
+ case SourcePlatform::Windows:
+ OptionalStructuredOutput->AddInteger("attributes"sv, Attribute);
+ break;
+ case SourcePlatform::MacOS:
+ case SourcePlatform::Linux:
+ OptionalStructuredOutput->AddString("chmod"sv, fmt::format("{:#04o}", Attribute));
+ break;
+ default:
+ throw std::runtime_error(fmt::format("Unsupported platform: {}", (int)Platform));
+ }
+ }
+ OptionalStructuredOutput->EndObject();
+ }
+ else
+ {
+ ZEN_CONSOLE("{}\t{}\t{}", Path, RawSize, RawHash);
+ }
+ }
+ }
+ }
+ if (OptionalStructuredOutput != nullptr)
+ {
+ OptionalStructuredOutput->EndArray(); // "files"
+ }
+ }
+ if (OptionalStructuredOutput != nullptr)
{
- const IoHash& RawHash = RawHashes[Index];
- const uint64_t RawSize = RawSizes[Index];
- const uint32_t Attribute = Attributes[Index];
- ZEN_UNUSED(Attribute);
-
- ZEN_CONSOLE("{}\t{}\t{}", Path, RawSize, RawHash);
+ OptionalStructuredOutput->EndObject();
}
}
+ if (OptionalStructuredOutput != nullptr)
+ {
+ OptionalStructuredOutput->EndArray(); // parts
+ }
}
}
- void DiffFolders(TransferThreadWorkers& Workers,
- const std::filesystem::path& BasePath,
- const std::filesystem::path& ComparePath,
- bool OnlyChunked)
+ void DiffFolders(TransferThreadWorkers& Workers,
+ const std::filesystem::path& BasePath,
+ const std::filesystem::path& ComparePath,
+ bool OnlyChunked,
+ const std::vector<std::string>& InExcludeFolders,
+ const std::vector<std::string>& InExcludeExtensions)
{
ZEN_TRACE_CPU("DiffFolders");
@@ -2104,7 +2182,7 @@ namespace {
{
StandardChunkingControllerSettings ChunkingSettings;
std::unique_ptr<ChunkingController> ChunkController = CreateStandardChunkingController(ChunkingSettings);
- std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions;
+ std::vector<std::string> ExcludeExtensions = InExcludeExtensions;
if (OnlyChunked)
{
ExcludeExtensions.insert(ExcludeExtensions.end(),
@@ -2115,7 +2193,7 @@ namespace {
ChunkingSettings.SplitAndCompressExtensions.end());
}
- auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool {
+ auto IsAcceptedFolder = [ExcludeFolders = InExcludeFolders](const std::string_view& RelativePath) -> bool {
for (const std::string& ExcludeFolder : ExcludeFolders)
{
if (RelativePath.starts_with(ExcludeFolder))
@@ -2404,6 +2482,25 @@ BuildsCommand::BuildsCommand()
"<excludewildcard>");
};
+ auto AddExcludeFolderOption = [this](cxxopts::Options& Ops) {
+ Ops.add_option("",
+ "",
+ "exclude-folders",
+ "Names of folders to exclude, separated by ;",
+ cxxopts::value(m_ExcludeFolders),
+ "<excludefolders>");
+ };
+
+ auto AddExcludeExtensionsOption = [this](cxxopts::Options& Ops) {
+ Ops.add_option("",
+ "",
+ "exclude-extensions",
+ "Extensions to exclude, separated by ;"
+ "include filter",
+ cxxopts::value(m_ExcludeExtensions),
+ "<excludeextensions>");
+ };
+
auto AddMultipartOptions = [this](cxxopts::Options& Ops) {
Ops.add_option("",
"",
@@ -2463,12 +2560,13 @@ BuildsCommand::BuildsCommand()
"Enable fetch of buckets within namespaces also",
cxxopts::value(m_ListNamespacesRecursive),
"<recursive>");
- m_ListNamespacesOptions.add_option("",
- "",
- "result-path",
- "Path to json or compactbinary to write query result to",
- cxxopts::value(m_ListResultPath),
- "<result-path>");
+ m_ListNamespacesOptions.add_option(
+ "",
+ "",
+ "result-path",
+ "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console",
+ cxxopts::value(m_ListResultPath),
+ "<result-path>");
m_ListNamespacesOptions.parse_positional({"result-path"});
m_ListNamespacesOptions.positional_help("result-path");
@@ -2488,7 +2586,7 @@ BuildsCommand::BuildsCommand()
m_ListOptions.add_option("",
"",
"result-path",
- "Path to json or compactbinary to write query result to",
+ "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console",
cxxopts::value(m_ListResultPath),
"<result-path>");
m_ListOptions.parse_positional({"query-path", "result-path"});
@@ -2503,7 +2601,7 @@ BuildsCommand::BuildsCommand()
m_ListBlocksOptions.add_option("",
"",
"result-path",
- "Path to json or compactbinary to write query result to",
+ "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console",
cxxopts::value(m_ListResultPath),
"<result-path>");
@@ -2521,6 +2619,8 @@ BuildsCommand::BuildsCommand()
AddCacheOptions(m_UploadOptions);
AddWorkerOptions(m_UploadOptions);
AddZenFolderOptions(m_UploadOptions);
+ AddExcludeFolderOption(m_UploadOptions);
+ AddExcludeExtensionsOption(m_UploadOptions);
m_UploadOptions.add_options()("h,help", "Print help");
m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>");
m_UploadOptions.add_option("",
@@ -2599,6 +2699,7 @@ BuildsCommand::BuildsCommand()
AddWorkerOptions(m_DownloadOptions);
AddWildcardOptions(m_DownloadOptions);
AddAppendNewContentOptions(m_DownloadOptions);
+ AddExcludeFolderOption(m_DownloadOptions);
m_DownloadOptions.add_option("cache",
"",
@@ -2691,12 +2792,28 @@ BuildsCommand::BuildsCommand()
cxxopts::value(m_BuildPartNames),
"<name>");
+ m_LsOptions.add_option("",
+ "",
+ "result-path",
+ "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console",
+ cxxopts::value(m_LsResultPath),
+ "<result-path>");
+
+ m_LsOptions.add_option("",
+ "o",
+ "output-path",
+ "Path to output, extension .json or .cb (compac binary). Default is output to console",
+ cxxopts::value(m_LsResultPath),
+ "<output-path>");
+
m_LsOptions.parse_positional({"build-id", "wildcard"});
m_LsOptions.positional_help("build-id wildcard");
// diff
AddOutputOptions(m_DiffOptions);
AddWorkerOptions(m_DiffOptions);
+ AddExcludeFolderOption(m_DiffOptions);
+ AddExcludeExtensionsOption(m_DiffOptions);
m_DiffOptions.add_options()("h,help", "Print help");
m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), "<diff-path>");
@@ -3188,6 +3305,28 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
OutExcludeWildcards = SplitWildcard(m_ExcludeWildcard);
};
+ auto ParseExcludeFolderAndExtension = [&](std::vector<std::string>& OutExcludeFolders, std::vector<std::string>& OutExcludeExtensions) {
+ auto SplitExclusion = [](const std::string_view Input) -> std::vector<std::string> {
+ std::vector<std::string> Exclusions;
+ ForEachStrTok(Input, ";,", [&Exclusions](std::string_view Exclusion) {
+ if (!Exclusion.empty())
+ {
+ std::string CleanExclusion(ToLower(Exclusion));
+ if (CleanExclusion.length() > 2 && CleanExclusion.front() == '"' && CleanExclusion.back() == '"')
+ {
+ CleanExclusion = CleanExclusion.substr(1, CleanExclusion.length() - 2);
+ }
+ Exclusions.emplace_back(std::move(CleanExclusion));
+ }
+ return true;
+ });
+ return Exclusions;
+ };
+
+ OutExcludeFolders = SplitExclusion(m_ExcludeFolders);
+ OutExcludeExtensions = SplitExclusion(m_ExcludeExtensions);
+ };
+
auto ParseDiffPath = [&]() {
if (m_DiffPath.empty())
{
@@ -3600,6 +3739,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path TempDir = ZenTempFolderPath(m_ZenFolderPath);
+ std::vector<std::string> ExcludeFolders = DefaultExcludeFolders;
+ std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions;
+ ParseExcludeFolderAndExtension(ExcludeFolders, ExcludeExtensions);
+
UploadFolder(*Output,
Workers,
Storage,
@@ -3615,7 +3758,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MetaData,
m_CreateBuild,
m_Clean,
- m_UploadToZenCache);
+ m_UploadToZenCache,
+ ExcludeFolders,
+ ExcludeExtensions);
if (!AbortFlag)
{
@@ -3737,6 +3882,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw OptionParseException("'--append' conflicts with '--clean'", SubOption->help());
}
+ std::vector<std::string> ExcludeFolders = DefaultExcludeFolders;
+ std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions;
+ ParseExcludeFolderAndExtension(ExcludeFolders, ExcludeExtensions);
+
DownloadFolder(*Output,
Workers,
Storage,
@@ -3759,7 +3908,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
.ExcludeWildcards = ExcludeWildcards,
.MaximumInMemoryPayloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory),
.PopulateCache = m_UploadToZenCache,
- .AppendNewContent = m_AppendNewContent});
+ .AppendNewContent = m_AppendNewContent,
+ .ExcludeFolders = ExcludeFolders});
if (AbortFlag)
{
@@ -3769,9 +3919,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_LsOptions)
{
- if (!IsQuiet)
+ if (!m_LsResultPath.empty())
{
- LogExecutableVersionAndPid();
+ if (!IsQuiet)
+ {
+ LogExecutableVersionAndPid();
+ }
}
ZenState InstanceState;
@@ -3801,7 +3954,30 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::vector<Oid> BuildPartIds = ParseBuildPartIds();
std::vector<std::string> BuildPartNames = ParseBuildPartNames();
- ListBuild(Storage, BuildId, BuildPartIds, BuildPartNames, IncludeWildcards, ExcludeWildcards);
+ std::unique_ptr<CbObjectWriter> StructuredOutput;
+ if (!m_LsResultPath.empty())
+ {
+ MakeSafeAbsolutePathÍnPlace(m_LsResultPath);
+ StructuredOutput = std::make_unique<CbObjectWriter>();
+ }
+
+ ListBuild(Storage, BuildId, BuildPartIds, BuildPartNames, IncludeWildcards, ExcludeWildcards, StructuredOutput.get());
+
+ if (StructuredOutput)
+ {
+ CbObject Response = StructuredOutput->Save();
+ if (ToLower(m_LsResultPath.extension().string()) == ".cbo")
+ {
+ MemoryView ResponseView = Response.GetView();
+ WriteFile(m_LsResultPath, IoBuffer(IoBuffer::Wrap, ResponseView.GetData(), ResponseView.GetSize()));
+ }
+ else
+ {
+ ExtendableStringBuilder<1024> SB;
+ CompactBinaryToJson(Response.GetView(), SB);
+ WriteFile(m_LsResultPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
+ }
+ }
if (AbortFlag)
{
@@ -3825,7 +4001,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ParsePath();
ParseDiffPath();
- DiffFolders(Workers, m_Path, m_DiffPath, m_OnlyChunked);
+ std::vector<std::string> ExcludeFolders = DefaultExcludeFolders;
+ std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions;
+ ParseExcludeFolderAndExtension(ExcludeFolders, ExcludeExtensions);
+
+ DiffFolders(Workers, m_Path, m_DiffPath, m_OnlyChunked, ExcludeFolders, ExcludeExtensions);
if (AbortFlag)
{
throw std::runtime_error("Diff folders aborted");
@@ -4245,7 +4425,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MetaData,
true,
false,
- m_UploadToZenCache);
+ m_UploadToZenCache,
+ DefaultExcludeFolders,
+ DefaultExcludeExtensions);
if (AbortFlag)
{
throw std::runtime_error("Test aborted. (Upload build)");
@@ -4542,7 +4724,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MetaData2,
true,
false,
- m_UploadToZenCache);
+ m_UploadToZenCache,
+ DefaultExcludeFolders,
+ DefaultExcludeExtensions);
if (AbortFlag)
{
throw std::runtime_error("Test aborted. (Upload scrambled)");
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index 081a3a460..80c64c48d 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -90,6 +90,12 @@ private:
std::filesystem::path m_Path;
+ std::string m_IncludeWildcard;
+ std::string m_ExcludeWildcard;
+
+ std::string m_ExcludeFolders;
+ std::string m_ExcludeExtensions;
+
cxxopts::Options m_UploadOptions{"upload", "Upload a folder"};
uint64_t m_FindBlockMaxCount = 10000;
bool m_PostUploadVerify = false;
@@ -100,9 +106,8 @@ private:
bool m_PostDownloadVerify = false;
bool m_EnableScavenging = true;
- cxxopts::Options m_LsOptions{"ls", "List the content of uploaded build"};
- std::string m_IncludeWildcard;
- std::string m_ExcludeWildcard;
+ cxxopts::Options m_LsOptions{"ls", "List the content of uploaded build"};
+ std::filesystem::path m_LsResultPath;
cxxopts::Options m_DiffOptions{"diff", "Compare two local folders"};
std::filesystem::path m_DiffPath;
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 15fb1f16c..4885fd363 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -47,12 +47,13 @@ namespace {
#define ZEN_CLOUD_STORAGE "Cloud Storage"
- void WriteAuthOptions(CbObjectWriter& Writer,
- std::string_view JupiterOpenIdProvider,
- std::string_view JupiterAccessToken,
- std::string_view JupiterAccessTokenEnv,
- std::string_view JupiterAccessTokenPath,
- std::string_view OidcTokenAuthExecutablePath)
+ void WriteAuthOptions(CbObjectWriter& Writer,
+ std::string_view JupiterOpenIdProvider,
+ std::string_view JupiterAccessToken,
+ std::string_view JupiterAccessTokenEnv,
+ std::string_view JupiterAccessTokenPath,
+ std::string_view OidcTokenAuthExecutablePath,
+ cxxopts::Options& Options)
{
if (!JupiterOpenIdProvider.empty())
{
@@ -87,6 +88,11 @@ namespace {
{
Writer.AddString("oidc-exe-path"sv, OidcTokenExePath.generic_string());
}
+ else if (!OidcTokenAuthExecutablePath.empty())
+ {
+ throw OptionParseException(fmt::format("'--oidctoken-exe-path' ('{}') does not exist", OidcTokenAuthExecutablePath),
+ Options.help());
+ }
}
IoBuffer MakeCbObjectPayload(std::function<void(CbObjectWriter& Writer)> WriteCB)
@@ -1234,7 +1240,8 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
m_JupiterAccessToken,
m_JupiterAccessTokenEnv,
m_JupiterAccessTokenPath,
- m_OidcTokenAuthExecutablePath);
+ m_OidcTokenAuthExecutablePath,
+ m_Options);
if (m_JupiterAssumeHttp2)
{
Writer.AddBool("assumehttp2"sv, true);
@@ -1270,7 +1277,8 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
m_JupiterAccessToken,
m_JupiterAccessTokenEnv,
m_JupiterAccessTokenPath,
- m_OidcTokenAuthExecutablePath);
+ m_OidcTokenAuthExecutablePath,
+ m_Options);
if (m_JupiterAssumeHttp2)
{
Writer.AddBool("assumehttp2"sv, true);
@@ -1664,7 +1672,8 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
m_JupiterAccessToken,
m_JupiterAccessTokenEnv,
m_JupiterAccessTokenPath,
- m_OidcTokenAuthExecutablePath);
+ m_OidcTokenAuthExecutablePath,
+ m_Options);
if (m_JupiterAssumeHttp2)
{
Writer.AddBool("assumehttp2"sv, true);
@@ -1689,7 +1698,8 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
m_JupiterAccessToken,
m_JupiterAccessTokenEnv,
m_JupiterAccessTokenPath,
- m_OidcTokenAuthExecutablePath);
+ m_OidcTokenAuthExecutablePath,
+ m_Options);
if (m_JupiterAssumeHttp2)
{
Writer.AddBool("assumehttp2"sv, true);
@@ -2634,8 +2644,6 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
BuildId,
{.IsQuiet = m_Quiet, .IsVerbose = m_Verbose, .ForceDownload = m_ForceDownload, .TempFolderPath = StorageTempPath});
- const Oid OplogBuildPartId = State.GetBuildPartId();
-
if (!m_Attachments.empty())
{
if (m_AttachmentsPath.empty())
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 4d4966222..c03ae476f 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -296,11 +296,15 @@ ZenCmdBase::LogExecutableVersionAndPid()
int
main(int argc, char** argv)
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
zen::SetCurrentThreadName("main");
std::vector<std::string> Args;
#if ZEN_PLATFORM_WINDOWS
- LPWSTR RawCommandLine = GetCommandLine();
+ LPWSTR RawCommandLine = GetCommandLineW();
std::string CommandLine = zen::WideToUtf8(RawCommandLine);
Args = zen::ParseCommandLine(CommandLine);
#else
diff --git a/src/zencore-test/zencore-test.cpp b/src/zencore-test/zencore-test.cpp
index 327550b32..68fc940ee 100644
--- a/src/zencore-test/zencore-test.cpp
+++ b/src/zencore-test/zencore-test.cpp
@@ -19,6 +19,10 @@
int
main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
#if ZEN_WITH_TESTS
zen::zencore_forcelinktests();
diff --git a/src/zencore/commandline.cpp b/src/zencore/commandline.cpp
index 78260aeef..426cf23d6 100644
--- a/src/zencore/commandline.cpp
+++ b/src/zencore/commandline.cpp
@@ -22,6 +22,10 @@ void
IterateCommandlineArgs(std::function<void(const std::string_view& Arg)>& ProcessArg)
{
#if ZEN_PLATFORM_WINDOWS
+ // It might seem odd to do this here in addition to at start of main functions but the InitGMalloc() function is called before main (via
+ // static data) so we need to make sure we set the locale before parsing the command line
+ setlocale(LC_ALL, "en_us.UTF8");
+
int ArgC = 0;
const LPWSTR CmdLine = ::GetCommandLineW();
const LPWSTR* ArgV = ::CommandLineToArgvW(CmdLine, &ArgC);
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 7f341818b..4da412c17 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -2669,36 +2669,41 @@ GetDirectoryContent(const std::filesystem::path& RootDir,
}
if (EnumHasAnyFlags(Flags, DirectoryContentFlags::Recursive))
{
- PendingWorkCount.AddCount(1);
- try
- {
- WorkerPool.ScheduleWork(
- [WorkerPool = &WorkerPool,
- PendingWorkCount = &PendingWorkCount,
- Visitor = Visitor,
- Flags = Flags,
- Path = std::move(Path),
- RelativeRoot = RelativeRoot / DirectoryName]() {
- ZEN_ASSERT(Visitor);
- auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); });
- try
- {
- MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor);
- FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(Path, SubVisitor);
- Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content));
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what());
- }
- },
- WorkerThreadPool::EMode::DisableBacklog);
- }
- catch (const std::exception& Ex)
+ if (Visitor->AsyncAllowDirectory(Parent, DirectoryName))
{
- ZEN_ERROR("Failed scheduling work to scan folder '{}'. Reason: '{}'", Path, Ex.what());
- PendingWorkCount.CountDown();
+ PendingWorkCount.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork(
+ [WorkerPool = &WorkerPool,
+ PendingWorkCount = &PendingWorkCount,
+ Visitor = Visitor,
+ Flags = Flags,
+ Path = std::move(Path),
+ RelativeRoot = RelativeRoot / DirectoryName]() {
+ ZEN_ASSERT(Visitor);
+ auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); });
+ try
+ {
+ MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor);
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(Path, SubVisitor);
+ Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'",
+ Path / RelativeRoot,
+ Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling work to scan folder '{}'. Reason: '{}'", Path, Ex.what());
+ PendingWorkCount.CountDown();
+ }
}
}
return false;
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index b4906aebf..938a05b59 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -368,6 +368,11 @@ public:
std::vector<std::filesystem::path> DirectoryNames;
std::vector<uint32_t> DirectoryAttributes;
};
+ virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const
+ {
+ ZEN_UNUSED(Parent, DirectoryName);
+ return true;
+ }
virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) = 0;
};
diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h
index 93f8add0a..4379f2f80 100644
--- a/src/zencore/include/zencore/string.h
+++ b/src/zencore/include/zencore/string.h
@@ -821,19 +821,66 @@ ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func)
while (It != End)
{
- if (*It == Delim)
+ std::string_view Remaining{It, size_t(ptrdiff_t(End - It))};
+ size_t Idx = Remaining.find(Delim, 0);
+ if (Idx == 0)
{
It++;
continue;
}
+ size_t DelimSize = 0;
+
+ if (Idx == std::string_view::npos)
+ {
+ Idx = Remaining.size();
+ }
+ else
+ {
+ DelimSize = 1;
+ }
+
+ Count++;
+ std::string_view Token{It, Idx};
+ if (!Func(Token))
+ {
+ break;
+ }
+
+ It = It + (Idx + DelimSize);
+ }
+
+ return Count;
+}
+
+template<typename Fn>
+uint32_t
+ForEachStrTok(const std::string_view& Str, const char* Delims, Fn&& Func)
+{
+ const char* It = Str.data();
+ const char* End = It + Str.length();
+ uint32_t Count = 0;
+
+ while (It != End)
+ {
std::string_view Remaining{It, size_t(ptrdiff_t(End - It))};
- size_t Idx = Remaining.find(Delim, 0);
+ size_t Idx = Remaining.find_first_of(Delims, 0);
+ if (Idx == 0)
+ {
+ It++;
+ continue;
+ }
+
+ size_t DelimSize = 0;
if (Idx == std::string_view::npos)
{
Idx = Remaining.size();
}
+ else
+ {
+ DelimSize = 1;
+ }
Count++;
std::string_view Token{It, Idx};
@@ -842,7 +889,7 @@ ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func)
break;
}
- It = It + Idx;
+ It = It + (Idx + DelimSize);
}
return Count;
diff --git a/src/zencore/string.cpp b/src/zencore/string.cpp
index c8c7c2cde..0ee863b74 100644
--- a/src/zencore/string.cpp
+++ b/src/zencore/string.cpp
@@ -1067,6 +1067,42 @@ TEST_CASE("string")
TokenCount = ForEachStrTok(""sv, ',', [](const std::string_view&) { return true; });
CHECK(TokenCount == ExpectedTokenCount);
}
+
+ SUBCASE("ForEachStrTok2")
+ {
+ const auto Tokens = "here,is;my,different tokens"sv;
+ const auto ExpectedTokens = "here,is,my,different,tokens"sv;
+ int32_t ExpectedTokenCount = 5;
+ int32_t TokenCount = 0;
+ StringBuilder<512> Sb;
+
+ TokenCount = ForEachStrTok(Tokens, " ,;", [&Sb](const std::string_view& Token) {
+ if (Sb.Size())
+ {
+ Sb << ",";
+ }
+ Sb << Token;
+ return true;
+ });
+
+ CHECK(TokenCount == ExpectedTokenCount);
+ CHECK(Sb.ToString() == ExpectedTokens);
+
+ ExpectedTokenCount = 1;
+ const auto Str = "mosdef"sv;
+
+ Sb.Reset();
+ TokenCount = ForEachStrTok(Str, " ,;", [&Sb](const std::string_view& Token) {
+ Sb << Token;
+ return true;
+ });
+ CHECK(Sb.ToString() == Str);
+ CHECK(TokenCount == ExpectedTokenCount);
+
+ ExpectedTokenCount = 0;
+ TokenCount = ForEachStrTok(""sv, " ,;", [](const std::string_view&) { return true; });
+ CHECK(TokenCount == ExpectedTokenCount);
+ }
}
#endif
diff --git a/src/zenhttp-test/zenhttp-test.cpp b/src/zenhttp-test/zenhttp-test.cpp
index d18b2167e..c18759beb 100644
--- a/src/zenhttp-test/zenhttp-test.cpp
+++ b/src/zenhttp-test/zenhttp-test.cpp
@@ -15,6 +15,10 @@
int
main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
#if ZEN_WITH_TESTS
zen::zenhttp_forcelinktests();
diff --git a/src/zenhttp/clients/httpclientcpr.cpp b/src/zenhttp/clients/httpclientcpr.cpp
index 987cdd706..5d92b3b6b 100644
--- a/src/zenhttp/clients/httpclientcpr.cpp
+++ b/src/zenhttp/clients/httpclientcpr.cpp
@@ -163,7 +163,7 @@ CprHttpClient::~CprHttpClient()
HttpClient::Response
CprHttpClient::ResponseWithPayload(std::string_view SessionId,
- cpr::Response& HttpResponse,
+ cpr::Response&& HttpResponse,
const HttpResponseCode WorkResponseCode,
IoBuffer&& Payload)
{
@@ -235,11 +235,7 @@ CprHttpClient::CommonResponse(std::string_view SessionId, cpr::Response&& HttpRe
}
else
{
- return ResponseWithPayload(
- SessionId,
- HttpResponse,
- WorkResponseCode,
- Payload ? std::move(Payload) : IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size()));
+ return ResponseWithPayload(SessionId, std::move(HttpResponse), WorkResponseCode, std::move(Payload));
}
}
diff --git a/src/zenhttp/clients/httpclientcpr.h b/src/zenhttp/clients/httpclientcpr.h
index 94d57fb43..40af53b5d 100644
--- a/src/zenhttp/clients/httpclientcpr.h
+++ b/src/zenhttp/clients/httpclientcpr.h
@@ -160,7 +160,7 @@ private:
HttpClient::Response CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload);
HttpClient::Response ResponseWithPayload(std::string_view SessionId,
- cpr::Response& HttpResponse,
+ cpr::Response&& HttpResponse,
const HttpResponseCode WorkResponseCode,
IoBuffer&& Payload);
};
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index a0431b0b1..be4e73576 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -4,6 +4,7 @@
#include "httptracer.h"
#include <zencore/except.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
#include <zencore/thread.h>
@@ -13,6 +14,8 @@
#include "httpparser.h"
+#include <EASTL/fixed_vector.h>
+
#include <deque>
#include <memory>
#include <string_view>
@@ -28,6 +31,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
# include <errno.h>
#endif
#include <asio.hpp>
+#include <asio/stream_file.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
#define ASIO_VERBOSE_TRACE 0
@@ -154,6 +158,345 @@ Log()
//////////////////////////////////////////////////////////////////////////
+#if !defined(ASIO_HAS_FILE)
+# define ASIO_HAS_FILE 0
+#endif
+
+#if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
+# define ZEN_USE_TRANSMITFILE 1
+# define ZEN_USE_ASYNC_SENDFILE ASIO_HAS_FILE
+#else
+# define ZEN_USE_TRANSMITFILE 0
+# define ZEN_USE_ASYNC_SENDFILE 0
+#endif
+
+#if ZEN_USE_TRANSMITFILE
+template<typename Handler>
+void
+TransmitFileAsync(asio::ip::tcp::socket& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb)
+{
+ // We need to establish a new handle here to avoid running into random errors
+ // during TransmitFile. I'm not entirely sure why it's necessary yet.
+
+ HANDLE hReopenedFile =
+ ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED);
+
+ const uint64_t FileSize = FileSizeFromHandle(FileHandle);
+ const uint64_t SendEndOffset = ByteOffset + ByteSize;
+
+ if (SendEndOffset > FileSize)
+ {
+ std::error_code DummyEc;
+
+ ZEN_WARN("TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) tries to transmit {} bytes too many",
+ ByteOffset,
+ ByteSize,
+ PathFromHandle(FileHandle, DummyEc),
+ FileSizeFromHandle(FileHandle),
+ SendEndOffset - FileSize);
+ }
+
+ asio::windows::overlapped_ptr OverlappedPtr(
+ Socket.get_executor(),
+ [WrappedCb = std::move(Cb), ExpectedBytes = ByteSize, FileHandle, ByteOffset, ByteSize, hReopenedFile](
+ const std::error_code& Ec,
+ std::size_t ActualBytesTransferred) {
+ if (Ec)
+ {
+ std::error_code DummyEc;
+ ZEN_WARN("NOTE: TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) error '{}', transmitted {} bytes",
+ ByteOffset,
+ ByteSize,
+ PathFromHandle(hReopenedFile, DummyEc),
+ FileSizeFromHandle(hReopenedFile),
+ Ec.message(),
+ ActualBytesTransferred);
+ }
+
+ CloseHandle(hReopenedFile);
+ WrappedCb(Ec, ActualBytesTransferred);
+ });
+
+ OVERLAPPED* RawOverlapped = OverlappedPtr.get();
+ RawOverlapped->Offset = uint32_t(ByteOffset & 0xffffFFFFull);
+ RawOverlapped->OffsetHigh = uint32_t(ByteOffset >> 32);
+
+ const DWORD NumberOfBytesPerSend = 0; // let TransmitFile decide
+
+ const BOOL Ok =
+ ::TransmitFile(Socket.native_handle(), hReopenedFile, ByteSize, NumberOfBytesPerSend, RawOverlapped, nullptr, /* dwReserved */ 0);
+ const DWORD LastError = ::GetLastError();
+
+ // Check if the operation completed immediately.
+ if (!Ok && LastError != ERROR_IO_PENDING)
+ {
+ // The operation completed immediately, so a completion notification needs
+ // to be posted. When complete() is called, ownership of the OVERLAPPED-
+ // derived object passes to the io_context.
+
+ asio::error_code ec(LastError, asio::error::get_system_category());
+ OverlappedPtr.complete(ec, 0);
+ }
+ else
+ {
+ // The operation was successfully initiated, so ownership of the
+ // OVERLAPPED-derived object has passed to the io_context.
+
+ OverlappedPtr.release();
+ }
+}
+#endif // ZEN_USE_TRANSMITFILE
+
+#if ZEN_USE_ASYNC_SENDFILE
+
+// Pipelined file sender that reads from a file and writes to a socket using two buffers
+// to pipeline reads and writes. Unfortunately this strategy can't currently be used on
+// non-Windows platforms as they don't currently support async file reads. We'll have
+// to build a mechanism using a thread pool for that, perhaps with optional support
+// for io_uring where available since that should be the most efficient.
+//
+// In other words, this is not super useful as Windows already has the TransmitFile
+// version above, but it's here for completeness and potential future use on other platforms.
+
+template<class AsyncWriteStream, class CompletionHandler>
+class PipelinedFileSender : public std::enable_shared_from_this<PipelinedFileSender<AsyncWriteStream, CompletionHandler>>
+{
+public:
+ PipelinedFileSender(AsyncWriteStream& WriteSocket,
+ asio::stream_file&& FileToReadFrom,
+ uint64_t ByteSize,
+ std::size_t BufferSize,
+ CompletionHandler&& CompletionToken)
+ : m_WriteSocket(WriteSocket)
+ , m_SourceFile(std::move(FileToReadFrom))
+ , m_Strand(asio::make_strand(m_WriteSocket.get_executor()))
+ , m_Buffers{std::vector<char>(BufferSize), std::vector<char>(BufferSize)}
+ , m_CompletionHandler(std::move(CompletionToken))
+ , m_TotalBytesToRead(ByteSize)
+ , m_RemainingBytesToRead(ByteSize)
+ {
+ }
+
+ void Start() { EnqueueRead(); }
+
+private:
+ struct Slot
+ {
+ std::size_t Size = 0; // valid bytes in buffer
+ bool Ready = false; // has unwritten data
+ bool InUse = false; // being written
+ };
+
+ void OnSendCompleted(const std::error_code& Ec)
+ {
+ // TODO: ensure this behaves properly for instance if the write fails while a read is pending
+
+ if (m_SendCompleted)
+ {
+ return;
+ }
+
+ m_SendCompleted = true;
+
+ // Ensure completion runs on the strand/executor.
+
+ asio::dispatch(m_Strand, [CompletionHandler = std::move(m_CompletionHandler), Ec, TotalBytesWritten = m_TotalBytesWritten]() {
+ CompletionHandler(Ec, TotalBytesWritten);
+ });
+ }
+
+ void EnqueueRead()
+ {
+ asio::dispatch(m_Strand, [self = this->shared_from_this()] {
+ self->TryPostRead();
+ self->PumpWrites();
+ });
+ }
+
+ void TryPostRead()
+ {
+ if (m_IsEof || m_ReadInFlight || m_RemainingBytesToRead == 0)
+ {
+ return;
+ }
+
+ const int ReadSlotIndex = GetFreeSlotIndex();
+ if (ReadSlotIndex < 0)
+ {
+ // no free slot; wait for writer to free one (not meant to ever happen)
+ return;
+ }
+
+ m_ReadInFlight = true;
+ auto ReadBuffer = asio::buffer(m_Buffers[ReadSlotIndex].data(), zen::Min(m_Buffers[ReadSlotIndex].size(), m_RemainingBytesToRead));
+
+ asio::async_read(
+ m_SourceFile,
+ ReadBuffer,
+ asio::bind_executor(m_Strand,
+ [self = this->shared_from_this(), ReadSlotIndex](const std::error_code& Ec, std::size_t ActualBytesRead) {
+ self->m_ReadInFlight = false;
+ self->m_RemainingBytesToRead -= ActualBytesRead;
+
+ if (Ec)
+ {
+ if (Ec == asio::error::eof)
+ {
+ ZEN_ASSERT(self->m_RemainingBytesToRead == 0);
+
+ self->m_IsEof = true;
+
+ // No data produced on EOF; just try to pump whatever is left
+ self->PumpWrites();
+ }
+ else
+ {
+ // read error, cancel everything and let outer completion handler know
+ self->OnSendCompleted(Ec);
+ }
+ }
+ else
+ {
+ // Mark slot as ready with ActualBytesRead valid bytes of data in buffer
+ self->m_Slots[ReadSlotIndex].Size = ActualBytesRead;
+ self->m_Slots[ReadSlotIndex].Ready = true;
+ self->PumpWrites();
+ self->TryPostRead();
+ }
+ }));
+ }
+
+ void PumpWrites()
+ {
+ if (m_WriteInFlight)
+ {
+ return;
+ }
+
+ const int WriteSlotIndex = GetReadySlotIndex();
+ if (WriteSlotIndex < 0)
+ {
+ // No ready data. We're done if EOF/no more data to read and no reads in flight and nothing ready
+ if (!m_ReadInFlight && (m_IsEof || m_RemainingBytesToRead == 0))
+ {
+ // all done
+ return OnSendCompleted({});
+ }
+
+ return;
+ }
+
+ m_WriteInFlight = true;
+ m_Slots[WriteSlotIndex].InUse = true;
+
+ asio::async_write(
+ m_WriteSocket,
+ asio::buffer(m_Buffers[WriteSlotIndex].data(), m_Slots[WriteSlotIndex].Size),
+ asio::bind_executor(m_Strand,
+ [self = this->shared_from_this(), WriteSlotIndex](const std::error_code& Ec, std::size_t BytesWritten) {
+ self->m_TotalBytesWritten += BytesWritten;
+ self->m_WriteInFlight = false;
+
+ if (Ec)
+ {
+ self->OnSendCompleted(Ec);
+
+ return;
+ }
+ else
+ {
+ // Free the slot
+ self->m_Slots[WriteSlotIndex].Ready = false;
+ self->m_Slots[WriteSlotIndex].InUse = false;
+ self->m_Slots[WriteSlotIndex].Size = 0;
+
+ self->TryPostRead();
+ self->PumpWrites();
+ }
+ }));
+ }
+
+ int GetFreeSlotIndex() const
+ {
+ for (int i = 0; i < 2; ++i)
+ {
+ if (!m_Slots[i].Ready && !m_Slots[i].InUse)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ int GetReadySlotIndex() const
+ {
+ for (int i = 0; i < 2; ++i)
+ {
+ if (m_Slots[i].Ready && !m_Slots[i].InUse)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ AsyncWriteStream& m_WriteSocket;
+ asio::stream_file m_SourceFile;
+ asio::strand<asio::any_io_executor> m_Strand;
+
+ // There's no synchronization needed for these as all access is via the strand
+ std::vector<char> m_Buffers[2];
+ Slot m_Slots[2];
+
+ bool m_IsEof = false;
+ bool m_ReadInFlight = false;
+ bool m_WriteInFlight = false;
+ bool m_SendCompleted = false;
+
+ const uint64_t m_TotalBytesToRead = 0;
+ uint64_t m_RemainingBytesToRead = 0;
+ uint64_t m_TotalBytesWritten = 0;
+
+ CompletionHandler m_CompletionHandler;
+};
+
+template<class AsyncWriteStream, class CompletionHandler>
+void
+SendFileAsync(AsyncWriteStream& WriteSocket,
+ const auto FileHandle,
+ uint64_t ByteOffset,
+ uint64_t ByteSize,
+ std::size_t BufferSize,
+ CompletionHandler&& CompletionToken)
+{
+ HANDLE hReopenedFile =
+ ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED);
+
+ // Note that this assumes ownership of the handle
+ asio::stream_file SourceFile(WriteSocket.get_executor(), hReopenedFile);
+
+ // TODO: handle any error properly here
+ SourceFile.seek(ByteOffset, asio::stream_file::seek_set);
+
+ if (BufferSize > ByteSize)
+ {
+ BufferSize = ByteSize;
+ }
+
+ auto op = std::make_shared<PipelinedFileSender<AsyncWriteStream, std::decay_t<CompletionHandler>>>(
+ WriteSocket,
+ std::move(SourceFile),
+ ByteSize,
+ BufferSize,
+ std::forward<CompletionHandler>(CompletionToken));
+
+ // Start the pipeline
+ op->Start();
+}
+#endif // ZEN_USE_ASYNC_SENDFILE
+
+//////////////////////////////////////////////////////////////////////////
+
struct HttpAsioServerImpl
{
public:
@@ -191,7 +534,7 @@ public:
class HttpAsioServerRequest : public HttpServerRequest
{
public:
- HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer);
+ HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer, uint32_t RequestNumber);
~HttpAsioServerRequest();
virtual Oid ParseSessionId() const override;
@@ -210,6 +553,7 @@ public:
HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete;
HttpRequestParser& m_Request;
+ uint32_t m_RequestNumber = 0; // Note: different to request ID which is derived from headers
IoBuffer m_PayloadBuffer;
std::unique_ptr<HttpResponse> m_Response;
};
@@ -218,7 +562,11 @@ struct HttpResponse
{
public:
HttpResponse() = default;
- explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {}
+ explicit HttpResponse(HttpContentType ContentType, uint32_t RequestNumber) : m_RequestNumber(RequestNumber), m_ContentType(ContentType)
+ {
+ }
+
+ ~HttpResponse() = default;
void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList)
{
@@ -230,57 +578,61 @@ public:
const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size());
m_DataBuffers.reserve(ChunkCount);
+ m_IoVecCount = ChunkCount + 1 /* one extra buffer for headers */;
+ m_IoVecs.resize(m_IoVecCount);
- for (IoBuffer& Buffer : BlobList)
- {
-#if 1
- m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned();
-#else
- IoBuffer TempBuffer = std::move(Buffer);
- TempBuffer.MakeOwned();
- m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer));
-#endif
- }
+ m_IoVecCursor = 0;
uint64_t LocalDataSize = 0;
+ int Index = 1;
- m_AsioBuffers.push_back({}); // Placeholder for header
-
- for (IoBuffer& Buffer : m_DataBuffers)
+ for (IoBuffer& Buffer : BlobList)
{
- uint64_t BufferDataSize = Buffer.Size();
+ const uint64_t BufferDataSize = Buffer.Size();
ZEN_ASSERT(BufferDataSize);
LocalDataSize += BufferDataSize;
- IoBufferFileReference FileRef;
- if (Buffer.GetFileReference(/* out */ FileRef))
- {
- // TODO: Use direct file transfer, via TransmitFile/sendfile
- //
- // this looks like it requires some custom asio plumbing however
+ IoBuffer OwnedBuffer = std::move(Buffer);
+ OwnedBuffer.MakeOwned();
- m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()});
- }
- else
+ IoVec& Io = m_IoVecs[Index++];
+
+ bool ChunkHandled = false;
+
+#if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE
+ if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef))
{
- // Send from memory
+ Io.IsFileRef = true;
+ Io.Ref.FileRef = FileRef;
+ ChunkHandled = true;
+ }
+#endif
- m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()});
+ if (!ChunkHandled)
+ {
+ Io.IsFileRef = false;
+ uint32_t Size = gsl::narrow<uint32_t>(OwnedBuffer.Size());
+ Io.Ref.MemoryRef = {OwnedBuffer.Data(), Size};
}
+
+ m_DataBuffers.emplace_back(OwnedBuffer);
}
+
m_ContentLength = LocalDataSize;
std::string_view Headers = GetHeaders();
- m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size());
+
+ IoVec& Io = m_IoVecs[0];
+
+ Io.IsFileRef = false;
+ Io.Ref.MemoryRef = {.Data = Headers.data(), .Size = gsl::narrow_cast<uint32_t>(Headers.size())};
}
uint16_t ResponseCode() const { return m_ResponseCode; }
uint64_t ContentLength() const { return m_ContentLength; }
- const std::vector<asio::const_buffer>& AsioBuffers() const { return m_AsioBuffers; }
-
std::string_view GetHeaders()
{
ZEN_MEMSCOPE(GetHttpasioTag());
@@ -299,16 +651,137 @@ public:
return m_Headers;
}
- void SuppressPayload() { m_AsioBuffers.resize(1); }
+ void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token)
+ {
+ m_SendCb = std::move(Token);
+
+ SendNextChunk(TcpSocket);
+ }
+
+ void SendNextChunk(asio::ip::tcp::socket& TcpSocket)
+ {
+ if (m_IoVecCursor == m_IoVecCount)
+ {
+ ZEN_ASSERT(m_SendCb);
+
+ auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); };
+
+ asio::defer(TcpSocket.get_executor(), std::move(CompletionToken));
+
+ return;
+ }
+
+ const IoVec& Io = m_IoVecs[m_IoVecCursor++];
+
+ if (Io.IsFileRef)
+ {
+ ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, bytes: {}", zen::GetCurrentThreadId(), Io.Ref.FileRef.FileChunkSize);
+
+#if ZEN_USE_TRANSMITFILE
+ TransmitFileAsync(TcpSocket,
+ Io.Ref.FileRef.FileHandle,
+ Io.Ref.FileRef.FileChunkOffset,
+ gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize),
+ [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
+ m_TotalBytesSent += ByteCount;
+ if (Ec)
+ {
+ m_SendCb(Ec, m_TotalBytesSent);
+ }
+ else
+ {
+ SendNextChunk(TcpSocket);
+ }
+ });
+#elif ZEN_USE_ASYNC_SENDFILE
+ SendFileAsync(TcpSocket,
+ Io.Ref.FileRef.FileHandle,
+ Io.Ref.FileRef.FileChunkOffset,
+ Io.Ref.FileRef.FileChunkSize,
+ 64 * 1024,
+ [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
+ m_TotalBytesSent += ByteCount;
+ if (Ec)
+ {
+ m_SendCb(Ec, m_TotalBytesSent);
+ }
+ else
+ {
+ SendNextChunk(TcpSocket);
+ }
+ });
+#else
+ // This should never occur unless we compile with one
+ // of the options above
+ ZEN_ASSERT("invalid file reference in response");
+#endif
+
+ return;
+ }
+
+ // Send as many consecutive non-file references as possible in one asio operation
+
+ std::vector<asio::const_buffer> AsioBuffers;
+ AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size});
+
+ while (m_IoVecCursor != m_IoVecCount)
+ {
+ const IoVec& Io2 = m_IoVecs[m_IoVecCursor];
+
+ if (Io2.IsFileRef)
+ {
+ break;
+ }
+
+ AsioBuffers.push_back(asio::const_buffer{Io2.Ref.MemoryRef.Data, Io2.Ref.MemoryRef.Size});
+ ++m_IoVecCursor;
+ }
+
+ asio::async_write(TcpSocket,
+ std::move(AsioBuffers),
+ asio::transfer_all(),
+ [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) {
+ m_TotalBytesSent += ByteCount;
+ if (Ec)
+ {
+ m_SendCb(Ec, m_TotalBytesSent);
+ }
+ else
+ {
+ SendNextChunk(TcpSocket);
+ }
+ });
+ }
private:
- uint16_t m_ResponseCode = 0;
- bool m_IsKeepAlive = true;
- HttpContentType m_ContentType = HttpContentType::kBinary;
- uint64_t m_ContentLength = 0;
- std::vector<IoBuffer> m_DataBuffers;
- std::vector<asio::const_buffer> m_AsioBuffers;
- ExtendableStringBuilder<160> m_Headers;
+ uint32_t m_RequestNumber = 0;
+ uint16_t m_ResponseCode = 0;
+ bool m_IsKeepAlive = true;
+ HttpContentType m_ContentType = HttpContentType::kBinary;
+ uint64_t m_ContentLength = 0;
+ eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive
+ ExtendableStringBuilder<160> m_Headers;
+
+ struct IoVec
+ {
+ bool IsFileRef;
+ union
+ {
+ struct MemoryBuffer
+ {
+ const void* Data;
+ uint32_t Size;
+ } MemoryRef;
+ IoBufferFileReference FileRef;
+ } Ref;
+ };
+
+ eastl::fixed_vector<IoVec, 8> m_IoVecs;
+ int m_IoVecCursor = 0;
+ int m_IoVecCount = 0;
+
+ std::function<void(const asio::error_code& Ec, std::size_t ByteCount)> m_SendCb;
+ uint64_t m_TotalBytesSent = 0;
};
//////////////////////////////////////////////////////////////////////////
@@ -339,37 +812,63 @@ private:
kTerminated
};
+ const char* StateToString(RequestState State)
+ {
+ switch (State)
+ {
+ case RequestState::kInitialState:
+ return "InitialState";
+ case RequestState::kInitialRead:
+ return "InitialRead";
+ case RequestState::kReadingMore:
+ return "ReadingMore";
+ case RequestState::kWriting:
+ return "Writing";
+ case RequestState::kWritingFinal:
+ return "WritingFinal";
+ case RequestState::kDone:
+ return "Done";
+ case RequestState::kTerminated:
+ return "Terminated";
+ default:
+ return "Unknown";
+ }
+ }
+
RequestState m_RequestState = RequestState::kInitialState;
HttpRequestParser m_RequestData{*this};
void EnqueueRead();
void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount);
- void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false);
+ void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop);
void CloseConnection();
- HttpAsioServerImpl& m_Server;
- asio::streambuf m_RequestBuffer;
- std::unique_ptr<asio::ip::tcp::socket> m_Socket;
- std::atomic<uint32_t> m_RequestCounter{0};
- uint32_t m_ConnectionId = 0;
- Ref<IHttpPackageHandler> m_PackageHandler;
+ HttpAsioServerImpl& m_Server;
+ asio::streambuf m_RequestBuffer;
+ std::atomic<uint32_t> m_RequestCounter{0};
+ uint32_t m_ConnectionId = 0;
+ Ref<IHttpPackageHandler> m_PackageHandler;
- RwLock m_ResponsesLock;
- std::deque<std::unique_ptr<HttpResponse>> m_Responses;
+ RwLock m_ActiveResponsesLock;
+ std::deque<std::unique_ptr<HttpResponse>> m_ActiveResponses;
+
+ std::unique_ptr<asio::ip::tcp::socket> m_Socket;
};
std::atomic<uint32_t> g_ConnectionIdCounter{0};
HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket)
: m_Server(Server)
-, m_Socket(std::move(Socket))
, m_ConnectionId(g_ConnectionIdCounter.fetch_add(1))
+, m_Socket(std::move(Socket))
{
ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId);
}
HttpServerConnection::~HttpServerConnection()
{
+ RwLock::ExclusiveLockScope _(m_ActiveResponsesLock);
+
ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId);
}
@@ -434,7 +933,11 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]
return;
default:
- ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message());
+ ZEN_WARN("on data received ERROR, connection: {} (state: {}), reason '{}'",
+ m_ConnectionId,
+ StateToString(m_RequestState),
+ Ec.message());
+
return TerminateConnection();
}
}
@@ -472,37 +975,58 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]
}
void
-HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop)
+HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec,
+ [[maybe_unused]] std::size_t ByteCount,
+ [[maybe_unused]] uint32_t RequestNumber,
+ HttpResponse* ResponseToPop)
{
ZEN_MEMSCOPE(GetHttpasioTag());
if (Ec)
{
- ZEN_WARN("on data sent ERROR, connection: {}, reason: '{}'", m_ConnectionId, Ec.message());
+ ZEN_WARN("on data sent ERROR, connection: {} (state: {}), reason: '{}' (bytes: {})",
+ m_ConnectionId,
+ StateToString(m_RequestState),
+ Ec.message(),
+ ByteCount);
+
TerminateConnection();
+
+ return;
}
- else
- {
- ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}",
- m_ConnectionId,
- m_RequestCounter.load(std::memory_order_relaxed),
- zen::GetCurrentThreadId(),
- NiceBytes(ByteCount));
- if (!m_RequestData.IsKeepAlive())
- {
- CloseConnection();
- }
- else
- {
- if (Pop)
+ ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}",
+ m_ConnectionId,
+ RequestNumber,
+ zen::GetCurrentThreadId(),
+ NiceBytes(ByteCount));
+
+ if (ResponseToPop)
+ {
+ m_ActiveResponsesLock.WithExclusiveLock([&] {
+ // Once a response is sent we can release any referenced resources
+ //
+ // completion callbacks may be issued out-of-order so we need to
+ // remove the relevant entry from our active response list, it may
+ // not be the first
+
+ if (auto It = find_if(begin(m_ActiveResponses),
+ end(m_ActiveResponses),
+ [ResponseToPop](const auto& Item) { return Item.get() == ResponseToPop; });
+ It != end(m_ActiveResponses))
+ {
+ m_ActiveResponses.erase(It);
+ }
+ else
{
- RwLock::ExclusiveLockScope _(m_ResponsesLock);
- m_Responses.pop_front();
+ ZEN_WARN("response not found");
}
+ });
+ }
- m_RequestCounter.fetch_add(1);
- }
+ if (!m_RequestData.IsKeepAlive())
+ {
+ CloseConnection();
}
}
@@ -553,13 +1077,13 @@ HttpServerConnection::HandleRequest()
m_RequestState = RequestState::kWriting;
}
+ const uint32_t RequestNumber = m_RequestCounter.fetch_add(1);
+
if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url()))
{
ZEN_TRACE_CPU("asio::HandleRequest");
- const uint32_t RequestNumber = m_RequestCounter.load(std::memory_order_relaxed);
-
- HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body());
+ HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), RequestNumber);
ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber);
@@ -635,34 +1159,38 @@ HttpServerConnection::HandleRequest()
if (m_RequestData.RequestVerb() == HttpVerb::kHead)
{
- Response->SuppressPayload();
- }
+ ZEN_TRACE_CPU("asio::async_write");
- const std::vector<asio::const_buffer>& ResponseBuffers = Response->AsioBuffers();
+ std::string_view Headers = Response->GetHeaders();
- uint64_t ResponseLength = 0;
+ std::vector<asio::const_buffer> AsioBuffers;
+ AsioBuffers.push_back(asio::const_buffer(Headers.data(), Headers.size()));
- for (const asio::const_buffer& Buffer : ResponseBuffers)
- {
- ResponseLength += Buffer.size();
+ asio::async_write(*m_Socket.get(),
+ AsioBuffers,
+ asio::transfer_all(),
+ [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
+ });
}
-
+ else
{
- RwLock::ExclusiveLockScope _(m_ResponsesLock);
- m_Responses.push_back(std::move(Response));
- }
+ ZEN_TRACE_CPU("asio::async_write");
- // TODO: should cork/uncork for Linux?
+ HttpResponse* ResponseRaw = Response.get();
- {
- ZEN_TRACE_CPU("asio::async_write");
- asio::async_write(*m_Socket.get(),
- ResponseBuffers,
- asio::transfer_exactly(ResponseLength),
- [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) {
- Conn->OnResponseDataSent(Ec, ByteCount, true);
- });
+ m_ActiveResponsesLock.WithExclusiveLock([&] {
+ // Keep referenced resources alive
+ m_ActiveResponses.push_back(std::move(Response));
+ });
+
+ ResponseRaw->SendResponse(
+ *m_Socket,
+ [Conn = AsSharedPtr(), ResponseRaw, RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ ResponseRaw);
+ });
}
+
return;
}
}
@@ -681,10 +1209,11 @@ HttpServerConnection::HandleRequest()
"\r\n"sv;
}
- asio::async_write(
- *m_Socket.get(),
- asio::buffer(Response),
- [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); });
+ asio::async_write(*m_Socket.get(),
+ asio::buffer(Response),
+ [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
+ });
}
else
{
@@ -706,10 +1235,11 @@ HttpServerConnection::HandleRequest()
"No suitable route found"sv;
}
- asio::async_write(
- *m_Socket.get(),
- asio::buffer(Response),
- [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); });
+ asio::async_write(*m_Socket.get(),
+ asio::buffer(Response),
+ [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) {
+ Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr);
+ });
}
}
@@ -1016,9 +1546,13 @@ private:
//////////////////////////////////////////////////////////////////////////
-HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer)
+HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request,
+ HttpService& Service,
+ IoBuffer PayloadBuffer,
+ uint32_t RequestNumber)
: HttpServerRequest(Service)
, m_Request(Request)
+, m_RequestNumber(RequestNumber)
, m_PayloadBuffer(std::move(PayloadBuffer))
{
const int PrefixLength = Service.UriPrefixLength();
@@ -1104,7 +1638,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode)
ZEN_ASSERT(!m_Response);
- m_Response.reset(new HttpResponse(HttpContentType::kBinary));
+ m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber));
std::array<IoBuffer, 0> Empty;
m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty);
@@ -1117,7 +1651,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT
ZEN_ASSERT(!m_Response);
- m_Response.reset(new HttpResponse(ContentType));
+ m_Response.reset(new HttpResponse(ContentType, m_RequestNumber));
m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs);
}
@@ -1127,7 +1661,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT
ZEN_MEMSCOPE(GetHttpasioTag());
ZEN_ASSERT(!m_Response);
- m_Response.reset(new HttpResponse(ContentType));
+ m_Response.reset(new HttpResponse(ContentType, m_RequestNumber));
IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size());
std::array<IoBuffer, 1> SingleBufferList({MessageBuffer});
diff --git a/src/zennet-test/zennet-test.cpp b/src/zennet-test/zennet-test.cpp
index 5e4d29220..bc3b8e8e9 100644
--- a/src/zennet-test/zennet-test.cpp
+++ b/src/zennet-test/zennet-test.cpp
@@ -16,6 +16,10 @@
int
main([[maybe_unused]] int argc, [[maybe_unused]] char** argv)
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
#if ZEN_WITH_TESTS
zen::zennet_forcelinktests();
diff --git a/src/zenremotestore-test/zenremotestore-test.cpp b/src/zenremotestore-test/zenremotestore-test.cpp
index a49c6d273..5db185041 100644
--- a/src/zenremotestore-test/zenremotestore-test.cpp
+++ b/src/zenremotestore-test/zenremotestore-test.cpp
@@ -17,6 +17,10 @@
int
main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
#if ZEN_WITH_TESTS
zen::zenremotestore_forcelinktests();
diff --git a/src/zenremotestore/builds/buildsavedstate.cpp b/src/zenremotestore/builds/buildsavedstate.cpp
index 5a86ee865..1d1f4605f 100644
--- a/src/zenremotestore/builds/buildsavedstate.cpp
+++ b/src/zenremotestore/builds/buildsavedstate.cpp
@@ -207,9 +207,14 @@ ReadBuildSaveStateFile(const std::filesystem::path& StateFilePath)
{
ZEN_TRACE_CPU("ReadStateFile");
- FileContents FileData = ReadFile(StateFilePath);
+ IoBuffer DataBuffer;
+ {
+ BasicFile Source(StateFilePath, BasicFile::Mode::kRead);
+ DataBuffer = Source.ReadAll();
+ }
+
CbValidateError ValidateError;
- if (CbObject CurrentStateObject = ValidateAndReadCompactBinaryObject(FileData.Flatten(), ValidateError);
+ if (CbObject CurrentStateObject = ValidateAndReadCompactBinaryObject(std::move(DataBuffer), ValidateError);
ValidateError == CbValidateError::None)
{
if (CurrentStateObject)
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index b9f5eb07a..3ca2f72c1 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -782,78 +782,17 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Stopwatch LocalTimer;
- for (uint32_t LocalSequenceIndex = 0;
- LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
- LocalSequenceIndex++)
- {
- const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
- const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
-
- {
- uint64_t SourceOffset = 0;
- const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
- for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
- {
- const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
- const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
-
- if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
- RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
- if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ ScavengeSourceForChunks(RemainingChunkCount,
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ RawHashToCopyChunkDataIndex,
+ SequenceIndexChunksLeftToWriteCounters,
+ m_LocalContent,
+ m_LocalLookup,
+ CopyChunkDatas,
+ uint32_t(-1),
+ m_CacheMappingStats.LocalChunkMatchingRemoteCount,
+ m_CacheMappingStats.LocalChunkMatchingRemoteByteCount);
- if (!ChunkTargetPtrs.empty())
- {
- CopyChunkData::ChunkTarget Target = {
- .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .RemoteChunkIndex = RemoteChunkIndex,
- .CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash);
- CopySourceIt != RawHashToCopyChunkDataIndex.end())
- {
- CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second];
- if (Data.TargetChunkLocationPtrs.size() > 1024)
- {
- RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
- CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
- .SourceSequenceIndex = LocalSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
- }
- else
- {
- Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
- ChunkTargetPtrs.begin(),
- ChunkTargetPtrs.end());
- Data.ChunkTargets.push_back(Target);
- }
- }
- else
- {
- RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
- CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
- .SourceSequenceIndex = LocalSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
- }
- m_CacheMappingStats.LocalChunkMatchingRemoteCount++;
- m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
- RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
- RemainingChunkCount--;
- }
- }
- }
- SourceOffset += LocalChunkRawSize;
- }
- }
- }
m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
}
@@ -867,86 +806,22 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
ScavengedContentIndex++)
{
const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex];
- // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
-
- for (uint32_t ScavengedSequenceIndex = 0;
- ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
- ScavengedSequenceIndex++)
- {
- const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
- const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex];
-
- {
- uint64_t SourceOffset = 0;
- const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex];
- for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++)
- {
- const uint32_t ScavengedChunkIndex =
- ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex];
- const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex];
- const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex];
-
- if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash);
- RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
- if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
-
- if (!ChunkTargetPtrs.empty())
- {
- CopyChunkData::ChunkTarget Target = {
- .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .RemoteChunkIndex = RemoteChunkIndex,
- .CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash);
- CopySourceIt != RawHashToCopyChunkDataIndex.end())
- {
- CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second];
- if (Data.TargetChunkLocationPtrs.size() > 1024)
- {
- RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash,
- CopyChunkDatas.size());
- CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
- .SourceSequenceIndex = ScavengedSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
- }
- else
- {
- Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
- ChunkTargetPtrs.begin(),
- ChunkTargetPtrs.end());
- Data.ChunkTargets.push_back(Target);
- }
- }
- else
- {
- RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size());
- CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
- .SourceSequenceIndex = ScavengedSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
- }
- m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++;
- m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize;
- RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
- RemainingChunkCount--;
- }
- }
- }
- SourceOffset += ScavengedChunkRawSize;
- }
- }
- }
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+
+ ScavengeSourceForChunks(RemainingChunkCount,
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ RawHashToCopyChunkDataIndex,
+ SequenceIndexChunksLeftToWriteCounters,
+ ScavengedContent,
+ ScavengedLookup,
+ CopyChunkDatas,
+ ScavengedContentIndex,
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteCount,
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount);
}
m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
}
+
if (!m_Options.IsQuiet)
{
if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 ||
@@ -2941,6 +2816,81 @@ BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source,
return true;
}
+void
+BuildsOperationUpdateFolder::ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount,
+ std::vector<bool>& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher>& InOutRawHashToCopyChunkDataIndex,
+ const std::vector<std::atomic<uint32_t>>& SequenceIndexChunksLeftToWriteCounters,
+ const ChunkedFolderContent& ScavengedContent,
+ const ChunkedContentLookup& ScavengedLookup,
+ std::vector<CopyChunkData>& InOutCopyChunkDatas,
+ uint32_t ScavengedContentIndex,
+ uint64_t& InOutChunkMatchingRemoteCount,
+ uint64_t& InOutChunkMatchingRemoteByteCount)
+{
+ for (uint32_t RemoteChunkIndex = 0;
+ RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size() && (InOutRemainingChunkCount > 0);
+ RemoteChunkIndex++)
+ {
+ if (!InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ const IoHash& RemoteChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (auto It = ScavengedLookup.ChunkHashToChunkIndex.find(RemoteChunkHash); It != ScavengedLookup.ChunkHashToChunkIndex.end())
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
+ {
+ const uint32_t ScavengedChunkIndex = It->second;
+ const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex];
+ const size_t ChunkSequenceLocationOffset = ScavengedLookup.ChunkSequenceLocationOffset[ScavengedChunkIndex];
+ const ChunkedContentLookup::ChunkSequenceLocation& ScavengeLocation =
+ ScavengedLookup.ChunkSequenceLocations[ChunkSequenceLocationOffset];
+ const IoHash& ScavengedSequenceRawHash =
+ ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengeLocation.SequenceIndex];
+
+ CopyChunkData::ChunkTarget Target = {.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
+ .RemoteChunkIndex = RemoteChunkIndex,
+ .CacheFileOffset = ScavengeLocation.Offset};
+ if (auto CopySourceIt = InOutRawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash);
+ CopySourceIt != InOutRawHashToCopyChunkDataIndex.end())
+ {
+ CopyChunkData& Data = InOutCopyChunkDatas[CopySourceIt->second];
+ if (Data.TargetChunkLocationPtrs.size() > 1024)
+ {
+ InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size());
+ InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengeLocation.SequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ else
+ {
+ Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
+ ChunkTargetPtrs.begin(),
+ ChunkTargetPtrs.end());
+ Data.ChunkTargets.push_back(Target);
+ }
+ }
+ else
+ {
+ InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size());
+ InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengeLocation.SequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ InOutChunkMatchingRemoteCount++;
+ InOutChunkMatchingRemoteByteCount += ScavengedChunkRawSize;
+ InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
+ InOutRemainingChunkCount--;
+ }
+ }
+ }
+ }
+}
+
std::filesystem::path
BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash)
{
diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp
index 15ece2edd..36b45e800 100644
--- a/src/zenremotestore/builds/buildstorageutil.cpp
+++ b/src/zenremotestore/builds/buildstorageutil.cpp
@@ -129,7 +129,10 @@ ResolveBuildStorage(OperationLogOutput& Output,
TestJupiterEndpoint(ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2, ClientSettings.Verbose);
TestResult.Success)
{
- ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", ServerEndpoint.BaseUrl);
+ if (Verbose)
+ {
+ ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", ServerEndpoint.BaseUrl);
+ }
HostUrl = ServerEndpoint.BaseUrl;
HostAssumeHttp2 = ServerEndpoint.AssumeHttp2;
@@ -172,7 +175,10 @@ ResolveBuildStorage(OperationLogOutput& Output,
TestZenCacheEndpoint(CacheEndpoint.BaseUrl, CacheEndpoint.AssumeHttp2, ClientSettings.Verbose);
TestResult.Success)
{
- ZEN_OPERATION_LOG_INFO(Output, "Cache endpoint at '{}/status/builds' succeeded", CacheEndpoint.BaseUrl);
+ if (Verbose)
+ {
+ ZEN_OPERATION_LOG_INFO(Output, "Cache endpoint at '{}/status/builds' succeeded", CacheEndpoint.BaseUrl);
+ }
CacheUrl = CacheEndpoint.BaseUrl;
CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2;
@@ -391,7 +397,10 @@ GetBlockDescriptions(OperationLogOutput& Output,
[BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; });
ListBlocksIt != FoundBlocks.end())
{
- ZEN_OPERATION_LOG_INFO(Output, "Found block {} via context find successfully", BlockHash);
+ if (!IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(Output, "Found block {} via context find successfully", BlockHash);
+ }
AugmentedBlockDescriptions.emplace_back(std::move(*ListBlocksIt));
}
else
diff --git a/src/zenremotestore/chunking/chunkedcontent.cpp b/src/zenremotestore/chunking/chunkedcontent.cpp
index e8187d348..fda01aa56 100644
--- a/src/zenremotestore/chunking/chunkedcontent.cpp
+++ b/src/zenremotestore/chunking/chunkedcontent.cpp
@@ -108,7 +108,7 @@ namespace {
uint32_t PathIndex,
std::atomic<bool>& AbortFlag)
{
- ZEN_TRACE_CPU("ChunkFolderContent");
+ ZEN_TRACE_CPU("HashOneFile");
const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex];
const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex];
diff --git a/src/zenremotestore/filesystemutils.cpp b/src/zenremotestore/filesystemutils.cpp
index 8dff05c6b..fa1ce6f78 100644
--- a/src/zenremotestore/filesystemutils.cpp
+++ b/src/zenremotestore/filesystemutils.cpp
@@ -11,6 +11,11 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+#endif // ZEN_WITH_TESTS
+
namespace zen {
BufferedOpenFile::BufferedOpenFile(const std::filesystem::path Path,
@@ -349,13 +354,14 @@ CleanDirectory(
std::atomic<uint64_t> DeletedItemCount = 0;
std::atomic<uint64_t> DeletedByteCount = 0;
- CleanDirectoryResult Result;
- RwLock ResultLock;
- auto _ = MakeGuard([&]() {
- Result.DeletedCount = DeletedItemCount.load();
- Result.DeletedByteCount = DeletedByteCount.load();
- Result.FoundCount = DiscoveredItemCount.load();
- });
+ std::vector<std::filesystem::path> DirectoriesToDelete;
+ CleanDirectoryResult Result;
+ RwLock ResultLock;
+ auto _ = MakeGuard([&]() {
+ Result.DeletedCount = DeletedItemCount.load();
+ Result.DeletedByteCount = DeletedByteCount.load();
+ Result.FoundCount = DiscoveredItemCount.load();
+ });
ParallelWork Work(AbortFlag,
PauseFlag,
@@ -363,119 +369,133 @@ CleanDirectory(
struct AsyncVisitor : public GetDirectoryContentVisitor
{
- AsyncVisitor(const std::filesystem::path& InPath,
- std::atomic<bool>& InAbortFlag,
- std::atomic<uint64_t>& InDiscoveredItemCount,
- std::atomic<uint64_t>& InDeletedItemCount,
- std::atomic<uint64_t>& InDeletedByteCount,
- std::span<const std::string> InExcludeDirectories,
- CleanDirectoryResult& InResult,
- RwLock& InResultLock)
+ AsyncVisitor(const std::filesystem::path& InPath,
+ std::atomic<bool>& InAbortFlag,
+ std::atomic<uint64_t>& InDiscoveredItemCount,
+ std::atomic<uint64_t>& InDeletedItemCount,
+ std::atomic<uint64_t>& InDeletedByteCount,
+ std::span<const std::string> InExcludeDirectories,
+ std::vector<std::filesystem::path>& OutDirectoriesToDelete,
+ CleanDirectoryResult& InResult,
+ RwLock& InResultLock)
: Path(InPath)
, AbortFlag(InAbortFlag)
, DiscoveredItemCount(InDiscoveredItemCount)
, DeletedItemCount(InDeletedItemCount)
, DeletedByteCount(InDeletedByteCount)
, ExcludeDirectories(InExcludeDirectories)
+ , DirectoriesToDelete(OutDirectoriesToDelete)
, Result(InResult)
, ResultLock(InResultLock)
{
}
+
+ virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const override
+ {
+ ZEN_UNUSED(Parent);
+
+ if (AbortFlag)
+ {
+ return false;
+ }
+ const std::string DirectoryString = DirectoryName.string();
+ for (const std::string_view ExcludeDirectory : ExcludeDirectories)
+ {
+ if (DirectoryString == ExcludeDirectory)
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override
{
ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory");
if (!AbortFlag)
{
- if (!Content.FileNames.empty())
+ DiscoveredItemCount += Content.FileNames.size();
+
+ ZEN_TRACE_CPU("DeleteFiles");
+ std::vector<std::pair<std::filesystem::path, std::error_code>> FailedRemovePaths;
+ for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++)
{
- DiscoveredItemCount += Content.FileNames.size();
+ const std::filesystem::path& FileName = Content.FileNames[FileIndex];
+ const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred();
- const std::string RelativeRootString = RelativeRoot.generic_string();
- bool RemoveContent = true;
- for (const std::string_view ExcludeDirectory : ExcludeDirectories)
+ bool IsRemoved = false;
+ std::error_code Ec;
+ (void)SetFileReadOnly(FilePath, false, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
{
- if (RelativeRootString.starts_with(ExcludeDirectory))
+ if (!IsFileWithRetry(FilePath))
{
- if (RelativeRootString.length() > ExcludeDirectory.length())
- {
- const char MaybePathDelimiter = RelativeRootString[ExcludeDirectory.length()];
- if (MaybePathDelimiter == '/' || MaybePathDelimiter == '\\' ||
- MaybePathDelimiter == std::filesystem::path::preferred_separator)
- {
- RemoveContent = false;
- break;
- }
- }
- else
- {
- RemoveContent = false;
- break;
- }
+ IsRemoved = true;
+ Ec.clear();
+ break;
}
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ (void)SetFileReadOnly(FilePath, false, Ec);
}
- if (RemoveContent)
+ if (!IsRemoved && !Ec)
{
- ZEN_TRACE_CPU("DeleteFiles");
- for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++)
+ (void)RemoveFile(FilePath, Ec);
+ for (size_t Retries = 0; Ec && Retries < 6; Retries++)
{
- const std::filesystem::path& FileName = Content.FileNames[FileIndex];
- const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred();
-
- bool IsRemoved = false;
- std::error_code Ec;
- (void)SetFileReadOnly(FilePath, false, Ec);
- for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ if (!IsFileWithRetry(FilePath))
{
- if (!IsFileWithRetry(FilePath))
- {
- IsRemoved = true;
- Ec.clear();
- break;
- }
- Sleep(100 + int(Retries * 50));
+ IsRemoved = true;
Ec.clear();
- (void)SetFileReadOnly(FilePath, false, Ec);
- }
- if (!IsRemoved && !Ec)
- {
- (void)RemoveFile(FilePath, Ec);
- for (size_t Retries = 0; Ec && Retries < 6; Retries++)
- {
- if (!IsFileWithRetry(FilePath))
- {
- IsRemoved = true;
- Ec.clear();
- return;
- }
- Sleep(100 + int(Retries * 50));
- Ec.clear();
- (void)RemoveFile(FilePath, Ec);
- }
- }
- if (!IsRemoved && Ec)
- {
- RwLock::ExclusiveLockScope _(ResultLock);
- Result.FailedRemovePaths.push_back(std::make_pair(FilePath, Ec));
- }
- else
- {
- DeletedItemCount++;
- DeletedByteCount += Content.FileSizes[FileIndex];
+ break;
}
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ (void)RemoveFile(FilePath, Ec);
}
}
+ if (!IsRemoved && Ec)
+ {
+ FailedRemovePaths.push_back(std::make_pair(FilePath, Ec));
+ }
+ else
+ {
+ DeletedItemCount++;
+ DeletedByteCount += Content.FileSizes[FileIndex];
+ }
+ }
+
+ if (!FailedRemovePaths.empty())
+ {
+ RwLock::ExclusiveLockScope _(ResultLock);
+ FailedRemovePaths.insert(FailedRemovePaths.end(), FailedRemovePaths.begin(), FailedRemovePaths.end());
+ }
+ else if (!RelativeRoot.empty())
+ {
+ DiscoveredItemCount++;
+ RwLock::ExclusiveLockScope _(ResultLock);
+ DirectoriesToDelete.push_back(RelativeRoot);
}
}
}
- const std::filesystem::path& Path;
- std::atomic<bool>& AbortFlag;
- std::atomic<uint64_t>& DiscoveredItemCount;
- std::atomic<uint64_t>& DeletedItemCount;
- std::atomic<uint64_t>& DeletedByteCount;
- std::span<const std::string> ExcludeDirectories;
- CleanDirectoryResult& Result;
- RwLock& ResultLock;
- } Visitor(Path, AbortFlag, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories, Result, ResultLock);
+ const std::filesystem::path& Path;
+ std::atomic<bool>& AbortFlag;
+ std::atomic<uint64_t>& DiscoveredItemCount;
+ std::atomic<uint64_t>& DeletedItemCount;
+ std::atomic<uint64_t>& DeletedByteCount;
+ std::span<const std::string> ExcludeDirectories;
+ std::vector<std::filesystem::path>& DirectoriesToDelete;
+ CleanDirectoryResult& Result;
+ RwLock& ResultLock;
+ } Visitor(Path,
+ AbortFlag,
+ DiscoveredItemCount,
+ DeletedItemCount,
+ DeletedByteCount,
+ ExcludeDirectories,
+ DirectoriesToDelete,
+ Result,
+ ResultLock);
GetDirectoryContent(Path,
DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes,
@@ -483,29 +503,6 @@ CleanDirectory(
IOWorkerPool,
Work.PendingWork());
- DirectoryContent LocalDirectoryContent;
- GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent);
- DiscoveredItemCount += LocalDirectoryContent.Directories.size();
- std::vector<std::filesystem::path> DirectoriesToDelete;
- DirectoriesToDelete.reserve(LocalDirectoryContent.Directories.size());
- for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
- {
- bool Leave = false;
- for (const std::string_view ExcludeDirectory : ExcludeDirectories)
- {
- if (LocalDirPath == (Path / ExcludeDirectory))
- {
- Leave = true;
- break;
- }
- }
- if (!Leave)
- {
- DirectoriesToDelete.emplace_back(std::move(LocalDirPath));
- DiscoveredItemCount++;
- }
- }
-
uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs();
if (ProgressFunc && ProgressUpdateDelayMS != 0)
@@ -528,6 +525,15 @@ CleanDirectory(
{
ZEN_TRACE_CPU("DeleteDirs");
+
+ std::sort(DirectoriesToDelete.begin(),
+ DirectoriesToDelete.end(),
+ [](const std::filesystem::path& Lhs, const std::filesystem::path& Rhs) {
+ auto DistanceLhs = std::distance(Lhs.begin(), Lhs.end());
+ auto DistanceRhs = std::distance(Rhs.begin(), Rhs.end());
+ return DistanceLhs > DistanceRhs;
+ });
+
for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete)
{
if (AbortFlag)
@@ -542,53 +548,48 @@ CleanDirectory(
}
}
- {
- std::error_code Ec;
- zen::CleanDirectory(DirectoryToDelete, /*ForceRemoveReadOnlyFiles*/ true, Ec);
- if (Ec)
- {
- Sleep(200);
- Ec.clear();
- zen::CleanDirectory(DirectoryToDelete, /*ForceRemoveReadOnlyFiles*/ true, Ec);
- }
+ const std::filesystem::path FullPath = Path / DirectoryToDelete;
- if (!Ec)
+ std::error_code Ec;
+ RemoveDir(FullPath, Ec);
+ if (Ec)
+ {
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
{
- RemoveDir(DirectoryToDelete, Ec);
- for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ if (!IsDir(FullPath))
{
- if (!IsDir(DirectoryToDelete))
- {
- Ec.clear();
- break;
- }
- Sleep(100 + int(Retries * 50));
Ec.clear();
- RemoveDir(DirectoryToDelete, Ec);
+ break;
}
- }
- if (Ec)
- {
- RwLock::ExclusiveLockScope __(ResultLock);
- Result.FailedRemovePaths.push_back(std::make_pair(DirectoryToDelete, Ec));
- }
- else
- {
- DeletedItemCount++;
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ RemoveDir(FullPath, Ec);
}
}
+ if (Ec)
+ {
+ RwLock::ExclusiveLockScope __(ResultLock);
+ Result.FailedRemovePaths.push_back(std::make_pair(DirectoryToDelete, Ec));
+ }
+ else
+ {
+ DeletedItemCount++;
+ }
- uint64_t NowMs = Timer.GetElapsedTimeMs();
-
- if ((NowMs - LastUpdateTimeMs) >= ProgressUpdateDelayMS)
+ if (ProgressFunc)
{
- LastUpdateTimeMs = NowMs;
+ uint64_t NowMs = Timer.GetElapsedTimeMs();
+
+ if ((NowMs - LastUpdateTimeMs) > 0)
+ {
+ LastUpdateTimeMs = NowMs;
- uint64_t Deleted = DeletedItemCount.load();
- uint64_t DeletedBytes = DeletedByteCount.load();
- uint64_t Discovered = DiscoveredItemCount.load();
- std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes));
- ProgressFunc(Details, Discovered, Discovered - Deleted, PauseFlag, AbortFlag);
+ uint64_t Deleted = DeletedItemCount.load();
+ uint64_t DeletedBytes = DeletedByteCount.load();
+ uint64_t Discovered = DiscoveredItemCount.load();
+ std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes));
+ ProgressFunc(Details, Discovered, Discovered - Deleted, PauseFlag, AbortFlag);
+ }
}
}
}
@@ -625,4 +626,72 @@ CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool,
return false;
}
+#if ZEN_WITH_TESTS
+
+void
+filesystemutils_forcelink()
+{
+}
+
+namespace {
+ void GenerateFile(const std::filesystem::path& Path) { BasicFile _(Path, BasicFile::Mode::kTruncate); }
+} // namespace
+
+TEST_CASE("filesystemutils.CleanDirectory")
+{
+ ScopedTemporaryDirectory TmpDir;
+
+ CreateDirectories(TmpDir.Path() / ".keepme");
+ GenerateFile(TmpDir.Path() / ".keepme" / "keep");
+ GenerateFile(TmpDir.Path() / "deleteme1");
+ GenerateFile(TmpDir.Path() / "deleteme2");
+ GenerateFile(TmpDir.Path() / "deleteme3");
+ CreateDirectories(TmpDir.Path() / ".keepmenot");
+ CreateDirectories(TmpDir.Path() / "no.keepme");
+
+ CreateDirectories(TmpDir.Path() / "DeleteMe");
+ GenerateFile(TmpDir.Path() / "DeleteMe" / "delete1");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete1");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete3");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe" / ".keepme");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete3");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme");
+ CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2");
+ GenerateFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme");
+
+ WorkerThreadPool Pool(4);
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+
+ CleanDirectory(Pool, AbortFlag, PauseFlag, TmpDir.Path(), std::vector<std::string>{".keepme"}, {}, 0);
+
+ CHECK(IsDir(TmpDir.Path() / ".keepme"));
+ CHECK(IsFile(TmpDir.Path() / ".keepme" / "keep"));
+ CHECK(!IsFile(TmpDir.Path() / "deleteme1"));
+ CHECK(!IsFile(TmpDir.Path() / "deleteme2"));
+ CHECK(!IsFile(TmpDir.Path() / "deleteme3"));
+ CHECK(!IsFile(TmpDir.Path() / ".keepmenot"));
+ CHECK(!IsFile(TmpDir.Path() / "no.keepme"));
+
+ CHECK(!IsDir(TmpDir.Path() / "DeleteMe"));
+ CHECK(!IsDir(TmpDir.Path() / "DeleteMe2"));
+
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe" / ".keepme"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme"));
+ CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept"));
+ CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1"));
+ CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2"));
+ CHECK(!IsFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme"));
+}
+
+#endif
+
} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index d78ee29c1..32c8bda01 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -141,7 +141,6 @@ public:
bool EnableTargetFolderScavenging = true;
bool ValidateCompletedSequences = true;
std::vector<std::string> ExcludeFolders;
- std::vector<std::string> ExcludeExtensions;
uint64_t MaximumInMemoryPayloadSize = 512u * 1024u;
bool PopulateCache = true;
};
@@ -257,6 +256,17 @@ private:
ChunkedFolderContent& OutScavengedLocalContent,
ChunkedContentLookup& OutScavengedLookup);
+ void ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount,
+ std::vector<bool>& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher>& InOutRawHashToCopyChunkDataIndex,
+ const std::vector<std::atomic<uint32_t>>& SequenceIndexChunksLeftToWriteCounters,
+ const ChunkedFolderContent& ScavengedContent,
+ const ChunkedContentLookup& ScavengedLookup,
+ std::vector<CopyChunkData>& InOutCopyChunkDatas,
+ uint32_t ScavengedContentIndex,
+ uint64_t& InOutChunkMatchingRemoteCount,
+ uint64_t& InOutChunkMatchingRemoteByteCount);
+
std::filesystem::path FindDownloadedChunk(const IoHash& ChunkHash);
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets(
diff --git a/src/zenremotestore/include/zenremotestore/filesystemutils.h b/src/zenremotestore/include/zenremotestore/filesystemutils.h
index a6c88e5cb..cfd6f02e1 100644
--- a/src/zenremotestore/include/zenremotestore/filesystemutils.h
+++ b/src/zenremotestore/include/zenremotestore/filesystemutils.h
@@ -116,4 +116,6 @@ bool CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool,
std::atomic<bool>& PauseFlag,
const std::filesystem::path& Directory);
+void filesystemutils_forcelink(); // internal
+
} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h
index 044436509..a07ede6f6 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h
@@ -56,7 +56,7 @@ private:
const Oid m_BuildId;
const Options m_Options;
- Oid m_BuildPartId;
+ Oid m_BuildPartId = Oid::Zero;
CbObject m_BuildObject;
CbObject m_BuildPartsObject;
CbObject m_OpsSectionObject;
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index b566e5bed..8be8eb0df 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -11,6 +11,7 @@
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
#include <zenremotestore/chunking/chunkedfile.h>
@@ -266,6 +267,8 @@ namespace remotestore_impl {
&DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
+ ZEN_TRACE_CPU("DownloadBlockChunks");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -386,7 +389,9 @@ namespace remotestore_impl {
IgnoreMissingAttachments,
OptionalContext,
RetriesLeft,
- Chunks = Chunks]() {
+ Chunks = std::vector<IoHash>(Chunks)]() {
+ ZEN_TRACE_CPU("DownloadBlock");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -439,7 +444,7 @@ namespace remotestore_impl {
IgnoreMissingAttachments,
OptionalContext,
RetriesLeft,
- Chunks = Chunks,
+ Chunks = std::move(Chunks),
Bytes = std::move(BlockResult.Bytes)]() {
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
@@ -492,7 +497,7 @@ namespace remotestore_impl {
{});
return;
}
- SharedBuffer BlockPayload = Compressed.Decompress();
+ CompositeBuffer BlockPayload = Compressed.DecompressToComposite();
if (!BlockPayload)
{
if (RetriesLeft > 0)
@@ -542,7 +547,7 @@ namespace remotestore_impl {
uint64_t BlockHeaderSize = 0;
bool StoreChunksOK = IterateChunkBlock(
- BlockPayload,
+ BlockPayload.Flatten(),
[&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize](
CompressedBuffer&& Chunk,
const IoHash& AttachmentRawHash) {
@@ -648,6 +653,8 @@ namespace remotestore_impl {
&Info,
IgnoreMissingAttachments,
OptionalContext]() {
+ ZEN_TRACE_CPU("DownloadAttachment");
+
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -694,6 +701,8 @@ namespace remotestore_impl {
AttachmentSize,
Bytes = std::move(AttachmentResult.Bytes),
OptionalContext]() {
+ ZEN_TRACE_CPU("WriteAttachment");
+
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -745,6 +754,8 @@ namespace remotestore_impl {
Chunks = std::move(ChunksInBlock),
&AsyncOnBlock,
&RemoteResult]() mutable {
+ ZEN_TRACE_CPU("CreateBlock");
+
auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -917,6 +928,8 @@ namespace remotestore_impl {
&LooseFileAttachments,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("UploadAttachment");
+
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1039,6 +1052,8 @@ namespace remotestore_impl {
&BulkBlockAttachmentsToUpload,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("UploadChunk");
+
auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -1587,6 +1602,8 @@ BuildContainer(CidStore& ChunkStore,
AllowChunking,
&RemoteResult,
OptionalContext]() {
+ ZEN_TRACE_CPU("PrepareChunk");
+
auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -1972,10 +1989,16 @@ BuildContainer(CidStore& ChunkStore,
try
{
uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs();
- std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
+ std::unordered_set<IoHash, IoHash::Hasher> AddedAttachmentHashes;
auto NewBlock = [&]() {
- size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
- size_t ChunkCount = ChunksInBlock.size();
+ size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
+ size_t ChunkCount = ChunksInBlock.size();
+ std::vector<IoHash> ChunkRawHashes;
+ ChunkRawHashes.reserve(ChunkCount);
+ for (const std::pair<IoHash, FetchChunkFunc>& Chunk : ChunksInBlock)
+ {
+ ChunkRawHashes.push_back(Chunk.first);
+ }
if (BuildBlocks)
{
remotestore_impl::CreateBlock(WorkerPool,
@@ -1990,15 +2013,13 @@ BuildContainer(CidStore& ChunkStore,
}
else
{
- ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size());
+ ZEN_INFO("Bulk group {} attachments", ChunkCount);
OnBlockChunks(std::move(ChunksInBlock));
}
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunkRawHashes.insert(Blocks[BlockIndex].ChunkRawHashes.end(),
- BlockAttachmentHashes.begin(),
- BlockAttachmentHashes.end());
+ Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
}
uint64_t NowMS = Timer.GetElapsedTimeMs();
ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
@@ -2007,7 +2028,6 @@ BuildContainer(CidStore& ChunkStore,
NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
NiceBytes(BlockSize));
FetchAttachmentsStartMS = NowMS;
- BlockAttachmentHashes.clear();
ChunksInBlock.clear();
BlockSize = 0;
GeneratedBlockCount++;
@@ -2039,8 +2059,17 @@ BuildContainer(CidStore& ChunkStore,
ZEN_ASSERT(InfoIt != UploadAttachments.end());
uint64_t PayloadSize = InfoIt->second.Size;
- if (BlockAttachmentHashes.insert(AttachmentHash).second)
+ if (AddedAttachmentHashes.insert(AttachmentHash).second)
{
+ if (BuildBlocks && ChunksInBlock.size() > 0)
+ {
+ if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) &&
+ (CurrentOpKey != LastOpKey))
+ {
+ NewBlock();
+ }
+ }
+
if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
{
ChunksInBlock.emplace_back(std::make_pair(
@@ -2079,10 +2108,6 @@ BuildContainer(CidStore& ChunkStore,
}
BlockSize += PayloadSize;
- if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey))
- {
- NewBlock();
- }
LastOpKey = CurrentOpKey;
ChunksAssembled++;
}
@@ -2123,9 +2148,17 @@ BuildContainer(CidStore& ChunkStore,
const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex];
if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end())
{
- if (BlockAttachmentHashes.insert(ChunkHash).second)
+ if (AddedAttachmentHashes.insert(ChunkHash).second)
{
const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
+ uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size);
+ if (BuildBlocks && ChunksInBlock.size() > 0)
+ {
+ if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock)
+ {
+ NewBlock();
+ }
+ }
ChunksInBlock.emplace_back(
std::make_pair(ChunkHash,
[Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
@@ -2136,13 +2169,6 @@ BuildContainer(CidStore& ChunkStore,
OodleCompressionLevel::None)};
}));
BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
- if (BuildBlocks)
- {
- if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock)
- {
- NewBlock();
- }
- }
ChunksAssembled++;
}
ChunkedHashes.erase(FindIt);
@@ -2781,12 +2807,26 @@ ParseOplogContainer(const CbObject& ContainerObject,
for (CbFieldView OpEntry : OpsArray)
{
OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); });
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = "Operation cancelled"};
+ }
}
}
{
std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end());
OnReferencedAttachments(ReferencedAttachments);
}
+
+ if (remotestore_impl::IsCancelled(OptionalContext))
+ {
+ return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = "Operation cancelled"};
+ }
+
remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size()));
CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView();
@@ -3206,6 +3246,8 @@ LoadOplog(CidStore& ChunkStore,
IgnoreMissingAttachments,
&Info,
OptionalContext]() {
+ ZEN_TRACE_CPU("DechunkAttachment");
+
auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
std::error_code Ec;
if (IsFile(TempFileName, Ec))
@@ -3232,7 +3274,7 @@ LoadOplog(CidStore& ChunkStore,
{
BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
- uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
BLAKE3Stream HashingStream;
for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
{
@@ -3255,15 +3297,80 @@ LoadOplog(CidStore& ChunkStore,
}
return;
}
- CompositeBuffer Decompressed =
- CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
- for (const SharedBuffer& Segment : Decompressed.GetSegments())
+
+ IoHash RawHash;
+ uint64_t RawSize;
+
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
+ if (RawHash != ChunkHash)
{
- MemoryView SegmentData = Segment.GetView();
- HashingStream.Append(SegmentData);
- TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
- Offset += SegmentData.GetSize();
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
+ RawHash,
+ ChunkHash,
+ Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
+ RawHash,
+ ChunkHash,
+ Chunked.RawHash));
+ }
+ return;
+ }
+
+ {
+ ZEN_TRACE_CPU("DecompressChunk");
+
+ if (!Compressed.DecompressToStream(0,
+ RawSize,
+ [&](uint64_t SourceOffset,
+ uint64_t SourceSize,
+ uint64_t Offset,
+ const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+
+ for (const SharedBuffer& Segment :
+ RangeBuffer.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(),
+ SegmentData.GetSize(),
+ ChunkOffset + Offset);
+ }
+ return true;
+ }))
+ {
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to decompress chunk {} for chunked attachment {}",
+ ChunkHash,
+ Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Failed to decompress chunk {} for chunked attachment {}",
+ ChunkHash,
+ Chunked.RawHash));
+ }
+ return;
+ }
}
+ ChunkOffset += RawSize;
}
BLAKE3 RawHash = HashingStream.GetHash();
ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
diff --git a/src/zenremotestore/zenremotestore.cpp b/src/zenremotestore/zenremotestore.cpp
index e074455b3..7f785599f 100644
--- a/src/zenremotestore/zenremotestore.cpp
+++ b/src/zenremotestore/zenremotestore.cpp
@@ -5,6 +5,7 @@
#include <zenremotestore/builds/buildsavedstate.h>
#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
+#include <zenremotestore/filesystemutils.h>
#include <zenremotestore/projectstore/remoteprojectstore.h>
#if ZEN_WITH_TESTS
@@ -19,6 +20,7 @@ zenremotestore_forcelinktests()
chunkedcontent_forcelink();
chunkedfile_forcelink();
chunkedcontent_forcelink();
+ filesystemutils_forcelink();
remoteprojectstore_forcelink();
}
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 42296cbe1..418fc7978 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -61,6 +61,10 @@ zen::ZenServerEnvironment TestEnv;
int
main(int argc, char** argv)
{
+# if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+# endif // ZEN_PLATFORM_WINDOWS
+
using namespace std::literals;
using namespace zen;
diff --git a/src/zenserver/diag/diagsvcs.cpp b/src/zenserver/diag/diagsvcs.cpp
index 8abf6e8a3..d8d53b0e3 100644
--- a/src/zenserver/diag/diagsvcs.cpp
+++ b/src/zenserver/diag/diagsvcs.cpp
@@ -28,30 +28,6 @@ GetHealthTag()
using namespace std::literals;
-static bool
-ReadLogFile(const std::string& Path, StringBuilderBase& Out)
-{
- try
- {
- constexpr auto ReadSize = std::size_t{4096};
- auto FileStream = std::ifstream{Path};
-
- std::string Buf(ReadSize, '\0');
- while (FileStream.read(&Buf[0], ReadSize))
- {
- Out.Append(std::string_view(&Buf[0], FileStream.gcount()));
- }
- Out.Append(std::string_view(&Buf[0], FileStream.gcount()));
-
- return true;
- }
- catch (const std::exception&)
- {
- Out.Reset();
- return false;
- }
-}
-
HttpHealthService::HttpHealthService()
{
ZEN_MEMSCOPE(GetHealthTag());
@@ -95,10 +71,9 @@ HttpHealthService::HttpHealthService()
return m_HealthInfo.AbsLogPath.empty() ? m_HealthInfo.DataRoot / "logs/zenserver.log" : m_HealthInfo.AbsLogPath;
}();
- ExtendableStringBuilder<4096> Sb;
- if (ReadLogFile(Path.string(), Sb) && Sb.Size() > 0)
+ if (IoBuffer LogBuffer = IoBufferBuilder::MakeFromFile(Path))
{
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, Sb.ToView());
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, LogBuffer);
}
else
{
diff --git a/src/zenserver/frontend/html.zip b/src/zenserver/frontend/html.zip
index f9fc8a8ef..36f08a05d 100644
--- a/src/zenserver/frontend/html.zip
+++ b/src/zenserver/frontend/html.zip
Binary files differ
diff --git a/src/zenserver/frontend/html/util/component.js b/src/zenserver/frontend/html/util/component.js
index 205aa038e..3c4780d77 100644
--- a/src/zenserver/frontend/html/util/component.js
+++ b/src/zenserver/frontend/html/util/component.js
@@ -72,7 +72,7 @@ class ComponentDom extends ComponentBase
text(value)
{
value = (value == undefined) ? "undefined" : value.toString();
- this._element.innerHTML = (value != "") ? value : "";
+ this._element.textContent = (value != "") ? value : "";
return this;
}
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 34848c831..da256d06d 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -206,6 +206,10 @@ AppMain(int argc, char* argv[])
int
test_main(int argc, char** argv)
{
+# if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+# endif // ZEN_PLATFORM_WINDOWS
+
zen::logging::InitializeLogging();
zen::logging::SetLogLevel(zen::logging::level::Debug);
@@ -218,6 +222,10 @@ test_main(int argc, char** argv)
int
main(int argc, char* argv[])
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
using namespace zen;
if (argc >= 2)
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index ab8dbb16b..95ea114bb 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -122,7 +122,9 @@ ZenServerBase::Initialize(const ZenServerConfig& ServerOptions, ZenServerState::
if (m_ServerMutex.Create(MutexName) == false)
{
- ThrowLastError(fmt::format("Failed to create mutex '{}'", MutexName).c_str());
+ std::error_code Ec = MakeErrorCodeFromLastError();
+ ZEN_WARN("Failed to create server mutex '{}'. Reason: '{}' ({})", MutexName, Ec.message(), Ec.value());
+ return -1;
}
EnqueueSigIntTimer();
diff --git a/src/zenstore-test/zenstore-test.cpp b/src/zenstore-test/zenstore-test.cpp
index 6df7162fd..c055dbb64 100644
--- a/src/zenstore-test/zenstore-test.cpp
+++ b/src/zenstore-test/zenstore-test.cpp
@@ -16,6 +16,10 @@
int
main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
#if ZEN_WITH_TESTS
zen::zenstore_forcelinktests();
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index f97c98e08..0542d1171 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -762,7 +762,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
LargestSize = Max(LargestSize, Size);
}
- const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u);
+ const uint64_t MinSize = Max(LargestSize, 512u * 1024u);
const uint64_t BufferSize = Min(TotalSize, MinSize);
std::vector<uint8_t> Buffer(BufferSize);
@@ -815,7 +815,12 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); });
+ if (Count > 1)
{
+ if (Buffer.empty())
+ {
+ Buffer.resize(BufferSize);
+ }
MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
for (size_t Index = 0; Index < Count; Index++)
{
@@ -824,9 +829,14 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
}
WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
+ m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
+ }
+ else
+ {
+ MemoryView SourceBuffer = Datas[Offset];
+ WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset);
+ m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed);
}
-
- m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
uint32_t ChunkOffset = AlignedInsertOffset;
std::vector<BlockStoreLocation> Locations(Count);
@@ -845,11 +855,11 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
bool
BlockStore::HasChunk(const BlockStoreLocation& Location) const
{
- ZEN_TRACE_CPU("BlockStore::TryGetChunk");
+ ZEN_TRACE_CPU("BlockStore::HasChunk");
RwLock::SharedLockScope InsertLock(m_InsertLock);
if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
{
- if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ if (Ref<BlockStoreFile> Block = BlockIt->second; Block)
{
InsertLock.ReleaseNow();
@@ -878,8 +888,10 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
RwLock::SharedLockScope InsertLock(m_InsertLock);
if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
{
- if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ if (Ref<BlockStoreFile> Block = BlockIt->second; Block)
{
+ InsertLock.ReleaseNow();
+
IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size);
if (Chunk.GetSize() == Location.Size)
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index a5de5c448..37a8c36b8 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -301,13 +301,14 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
ZEN_TRACE_CPU("CasContainer::FindChunk");
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope Lock(m_LocationMapLock);
auto KeyIt = m_LocationMap.find(ChunkHash);
if (KeyIt == m_LocationMap.end())
{
return IoBuffer();
}
- const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ Lock.ReleaseNow();
IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
return Chunk;
@@ -316,10 +317,11 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope Lock(m_LocationMapLock);
if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
{
- const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ Lock.ReleaseNow();
return m_BlockStore.HasChunk(Location);
}
return false;
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index f1001f665..c5b27c1ea 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -3917,7 +3917,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_INFO("scrubbing '{}'", ProjectRootDir);
+ ZEN_INFO("scrubbing '{}'", m_OplogStoragePath);
// Scrubbing needs to check all existing oplogs
std::vector<std::string> OpLogs = ScanForOplogs();
diff --git a/src/zentelemetry-test/zentelemetry-test.cpp b/src/zentelemetry-test/zentelemetry-test.cpp
index c8b067226..83fd549db 100644
--- a/src/zentelemetry-test/zentelemetry-test.cpp
+++ b/src/zentelemetry-test/zentelemetry-test.cpp
@@ -16,6 +16,10 @@
int
main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
#if ZEN_WITH_TESTS
zen::zentelemetry_forcelinktests();
diff --git a/src/zenutil-test/zenutil-test.cpp b/src/zenutil-test/zenutil-test.cpp
index 3e3a11a01..f5cfd5a72 100644
--- a/src/zenutil-test/zenutil-test.cpp
+++ b/src/zenutil-test/zenutil-test.cpp
@@ -16,6 +16,10 @@
int
main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
{
+#if ZEN_PLATFORM_WINDOWS
+ setlocale(LC_ALL, "en_us.UTF8");
+#endif // ZEN_PLATFORM_WINDOWS
+
#if ZEN_WITH_TESTS
zen::zenutil_forcelinktests();
diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
index 4d10f3794..8901b7779 100644
--- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h
+++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
@@ -11,6 +11,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <spdlog/sinks/sink.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <atomic>
#include <filesystem>
namespace zen::logging {
@@ -248,7 +249,7 @@ private:
const std::size_t m_MaxSize;
const std::size_t m_MaxFiles;
BasicFile m_CurrentFile;
- bool m_NeedFlush = false;
+ std::atomic<bool> m_NeedFlush = false;
};
} // namespace zen::logging