diff options
42 files changed, 1683 insertions, 644 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 0eb1574fe..47ef6f3e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,28 @@ ## +- Feature: Added `--exclude-folders` to `zen upload`, `zen download` and `zen diff` to extend the default exclude folders. Each folder name is separated with ';' or ',' + - The default folders that are always excluded are: + - `.unsync` + - `.zen` + - `.ugs` + - `.zen-tmp` +- Feature: Added `--exclude-extensions` to `zen upload` and `zen diff` to set the file extensions to exclude. Each extension name is separated with ';' or ',' +- Feature: Added `--result-path` option to `zen builds ls` to output structured to a file +- Improvement: On Windows, the asio HTTP back-end now uses TransmitFile to send data directly from files instead of using memory mapping. - Improvement: Optimized scavenge lookup performance - Improvement: Enable limit-overwrite behavior by default +- Improvement: Validate chunk hashes when dechunking files in oplog import +- Improvement: Use stream decompression when dechunking files +- Improvement: When assembling blocks for oplog export, make sure we keep under/at block size limit +- Improvement: Make cancelling of oplog import more responsive +- Improvement: Use decompress to composite to avoid allocating a new memory buffer for uncompressed chunks during oplog import +- Improvement: Reduce memory buffer size and allocate it on demand when writing multiple chunks to block store +- Improvement: Reduce lock contention when fetching/checking existence of chunks in block store +- Improvement: If we fail to create the server mutex, gracefully report the problem and exit without sending error to Sentry +- Improvement: Excluded folder names are now matched by folder name in subfolders in addition to root level folders +- Improvement: If the path given in `--oidctoken-exe-path` for zen commands is invalid, exit with error instead of ignoring the argument - Bugfix: Upstream propagation of Put operations would not retain the overwrite cache policy if it was used +- Bugfix: `zen oplog-download` failed to initialize build part id causing it to fail to download the build part +- Bugfix: Windows: Set up UTF8 as current locale to properly handle non-ascii characters ## 5.7.15 - Feature: `zen oplog-export`, `zen oplog-import` and `zen oplog-download` now has options to boost workers diff --git a/VERSION.txt b/VERSION.txt index dcfab7848..d6542901d 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -5.7.15
\ No newline at end of file +5.7.16
\ No newline at end of file diff --git a/src/zen/authutils.cpp b/src/zen/authutils.cpp index fdcb8e15d..31db82efd 100644 --- a/src/zen/authutils.cpp +++ b/src/zen/authutils.cpp @@ -283,6 +283,10 @@ AuthCommandLineOptions::ParseOptions(cxxopts::Options& Ops, ClientSettings.AccessTokenProvider = httpclientauth::CreateFromOidcTokenExecutable(OidcTokenExePath, HostUrl, Quiet, m_OidcTokenUnattended, Hidden); } + else if (!m_OidcTokenAuthExecutablePath.empty()) + { + throw OptionParseException(fmt::format("'--oidctoken-exe-path' ('{}') does not exist", m_OidcTokenAuthExecutablePath), Ops.help()); + } if (!ClientSettings.AccessTokenProvider) { diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 25f66e0ee..d7980cc24 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -427,22 +427,24 @@ namespace { NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000)); } - void UploadFolder(OperationLogOutput& Output, - TransferThreadWorkers& Workers, - StorageInstance& Storage, - const Oid& BuildId, - const Oid& BuildPartId, - const std::string_view BuildPartName, - const std::filesystem::path& Path, - const std::filesystem::path& TempDir, - const std::filesystem::path& ManifestPath, - const uint64_t FindBlockMaxCount, - const uint8_t BlockReuseMinPercentLimit, - bool AllowMultiparts, - const CbObject& MetaData, - bool CreateBuild, - bool IgnoreExistingBlocks, - bool UploadToZenCache) + void UploadFolder(OperationLogOutput& Output, + TransferThreadWorkers& Workers, + StorageInstance& Storage, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const std::filesystem::path& Path, + const std::filesystem::path& TempDir, + const std::filesystem::path& ManifestPath, + const uint64_t FindBlockMaxCount, + const uint8_t BlockReuseMinPercentLimit, + bool AllowMultiparts, + const CbObject& MetaData, + bool CreateBuild, + bool IgnoreExistingBlocks, + bool UploadToZenCache, + const std::vector<std::string>& ExcludeFolders, + const std::vector<std::string>& ExcludeExtensions) { ProgressBar::SetLogOperationName(ProgressMode, "Upload Folder"); { @@ -470,8 +472,8 @@ namespace { .AllowMultiparts = AllowMultiparts, .IgnoreExistingBlocks = IgnoreExistingBlocks, .TempDir = TempDir, - .ExcludeFolders = DefaultExcludeFolders, - .ExcludeExtensions = DefaultExcludeExtensions, + .ExcludeFolders = ExcludeFolders, + .ExcludeExtensions = ExcludeExtensions, .ZenExcludeManifestName = ZenExcludeManifestName, .NonCompressableExtensions = DefaultSplitOnlyExtensions, .PopulateCache = UploadToZenCache}); @@ -682,12 +684,13 @@ namespace { uint64_t VerifyElapsedWallTimeUs = 0; }; - void VerifyFolder(TransferThreadWorkers& Workers, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - const std::filesystem::path& Path, - bool VerifyFileHash, - VerifyFolderStatistics& VerifyFolderStats) + void VerifyFolder(TransferThreadWorkers& Workers, + const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const std::filesystem::path& Path, + const std::vector<std::string>& ExcludeFolders, + bool VerifyFileHash, + VerifyFolderStatistics& VerifyFolderStats) { ZEN_TRACE_CPU("VerifyFolder"); @@ -704,7 +707,7 @@ namespace { RwLock ErrorLock; std::vector<std::string> Errors; - auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { + auto IsAcceptedFolder = [ExcludeFolders = ExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) @@ -1593,6 +1596,7 @@ namespace { uint64_t MaximumInMemoryPayloadSize = 512u * 1024u; bool PopulateCache = true; bool AppendNewContent = false; + std::vector<std::string> ExcludeFolders; }; void DownloadFolder(OperationLogOutput& Output, @@ -1834,8 +1838,7 @@ namespace { .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent, .ValidateCompletedSequences = Options.PostDownloadVerify, - .ExcludeFolders = DefaultExcludeFolders, - .ExcludeExtensions = DefaultExcludeExtensions, + .ExcludeFolders = Options.ExcludeFolders, .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize, .PopulateCache = Options.PopulateCache}); { @@ -1861,7 +1864,13 @@ namespace { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Verify, TaskSteps::StepCount); - VerifyFolder(Workers, RemoteContent, RemoteLookup, Path, Options.PostDownloadVerify, VerifyFolderStats); + VerifyFolder(Workers, + RemoteContent, + RemoteLookup, + Path, + Options.ExcludeFolders, + Options.PostDownloadVerify, + VerifyFolderStats); Stopwatch WriteStateTimer; CbObject StateObject = CreateBuildSaveStateObject(LocalState); @@ -1999,88 +2008,157 @@ namespace { const std::vector<Oid>& BuildPartIds, std::span<const std::string> BuildPartNames, std::span<const std::string> IncludeWildcards, - std::span<const std::string> ExcludeWildcards) + std::span<const std::string> ExcludeWildcards, + CbObjectWriter* OptionalStructuredOutput) { std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; CbObject BuildObject = GetBuild(*Storage.BuildStorage, BuildId); + if (OptionalStructuredOutput != nullptr) + { + OptionalStructuredOutput->AddObjectId("buildId"sv, BuildId); + OptionalStructuredOutput->AddObject("build"sv, BuildObject); + } + std::vector<std::pair<Oid, std::string>> AllBuildParts = ResolveBuildPartNames(BuildObject, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); - Stopwatch GetBuildPartTimer; - - for (size_t BuildPartIndex = 0; BuildPartIndex < AllBuildParts.size(); BuildPartIndex++) + if (!AllBuildParts.empty()) { - const Oid BuildPartId = AllBuildParts[BuildPartIndex].first; - const std::string_view BuildPartName = AllBuildParts[BuildPartIndex].second; - CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId); + Stopwatch GetBuildPartTimer; - if (!IsQuiet) + if (OptionalStructuredOutput != nullptr) { - ZEN_CONSOLE("{}Part: {} ('{}'):\n", - BuildPartIndex > 0 ? "\n" : "", - BuildPartId, - BuildPartName, - NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), - NiceBytes(BuildPartManifest.GetSize())); + OptionalStructuredOutput->BeginArray("parts"sv); } - std::vector<std::filesystem::path> Paths; - std::vector<IoHash> RawHashes; - std::vector<uint64_t> RawSizes; - std::vector<uint32_t> Attributes; - - SourcePlatform Platform; - std::vector<IoHash> SequenceRawHashes; - std::vector<uint32_t> ChunkCounts; - std::vector<uint32_t> AbsoluteChunkOrders; - std::vector<IoHash> LooseChunkHashes; - std::vector<uint64_t> LooseChunkRawSizes; - std::vector<IoHash> BlockRawHashes; + for (size_t BuildPartIndex = 0; BuildPartIndex < AllBuildParts.size(); BuildPartIndex++) + { + const Oid BuildPartId = AllBuildParts[BuildPartIndex].first; + const std::string_view BuildPartName = AllBuildParts[BuildPartIndex].second; + CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId); - ReadBuildContentFromCompactBinary(BuildPartManifest, - Platform, - Paths, - RawHashes, - RawSizes, - Attributes, - SequenceRawHashes, - ChunkCounts, - AbsoluteChunkOrders, - LooseChunkHashes, - LooseChunkRawSizes, - BlockRawHashes); + if (OptionalStructuredOutput != nullptr) + { + OptionalStructuredOutput->BeginObject(); + OptionalStructuredOutput->AddObjectId("id"sv, BuildPartId); + OptionalStructuredOutput->AddString("partName"sv, BuildPartName); + } + { + if (OptionalStructuredOutput != nullptr) + { + } + else if (!IsQuiet) + { + ZEN_CONSOLE("{}Part: {} ('{}'):\n", + BuildPartIndex > 0 ? "\n" : "", + BuildPartId, + BuildPartName, + NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), + NiceBytes(BuildPartManifest.GetSize())); + } - std::vector<size_t> Order(Paths.size()); - std::iota(Order.begin(), Order.end(), 0); + std::vector<std::filesystem::path> Paths; + std::vector<IoHash> RawHashes; + std::vector<uint64_t> RawSizes; + std::vector<uint32_t> Attributes; + + SourcePlatform Platform; + std::vector<IoHash> SequenceRawHashes; + std::vector<uint32_t> ChunkCounts; + std::vector<uint32_t> AbsoluteChunkOrders; + std::vector<IoHash> LooseChunkHashes; + std::vector<uint64_t> LooseChunkRawSizes; + std::vector<IoHash> BlockRawHashes; + + ReadBuildContentFromCompactBinary(BuildPartManifest, + Platform, + Paths, + RawHashes, + RawSizes, + Attributes, + SequenceRawHashes, + ChunkCounts, + AbsoluteChunkOrders, + LooseChunkHashes, + LooseChunkRawSizes, + BlockRawHashes); + + std::vector<size_t> Order(Paths.size()); + std::iota(Order.begin(), Order.end(), 0); + + std::sort(Order.begin(), Order.end(), [&](size_t Lhs, size_t Rhs) { + const std::filesystem::path& LhsPath = Paths[Lhs]; + const std::filesystem::path& RhsPath = Paths[Rhs]; + return LhsPath < RhsPath; + }); - std::sort(Order.begin(), Order.end(), [&](size_t Lhs, size_t Rhs) { - const std::filesystem::path& LhsPath = Paths[Lhs]; - const std::filesystem::path& RhsPath = Paths[Rhs]; - return LhsPath < RhsPath; - }); + if (OptionalStructuredOutput != nullptr) + { + OptionalStructuredOutput->BeginArray("files"sv); + } + { + for (size_t Index : Order) + { + const std::filesystem::path& Path = Paths[Index]; + if (IncludePath(IncludeWildcards, ExcludeWildcards, Path)) + { + const IoHash& RawHash = RawHashes[Index]; + const uint64_t RawSize = RawSizes[Index]; + const uint32_t Attribute = Attributes[Index]; - for (size_t Index : Order) - { - const std::filesystem::path& Path = Paths[Index]; - if (IncludePath(IncludeWildcards, ExcludeWildcards, Path)) + if (OptionalStructuredOutput != nullptr) + { + OptionalStructuredOutput->BeginObject(); + { + OptionalStructuredOutput->AddString("path"sv, fmt::format("{}", Path)); + OptionalStructuredOutput->AddInteger("rawSize"sv, RawSize); + switch (Platform) + { + case SourcePlatform::Windows: + OptionalStructuredOutput->AddInteger("attributes"sv, Attribute); + break; + case SourcePlatform::MacOS: + case SourcePlatform::Linux: + OptionalStructuredOutput->AddString("chmod"sv, fmt::format("{:#04o}", Attribute)); + break; + default: + throw std::runtime_error(fmt::format("Unsupported platform: {}", (int)Platform)); + } + } + OptionalStructuredOutput->EndObject(); + } + else + { + ZEN_CONSOLE("{}\t{}\t{}", Path, RawSize, RawHash); + } + } + } + } + if (OptionalStructuredOutput != nullptr) + { + OptionalStructuredOutput->EndArray(); // "files" + } + } + if (OptionalStructuredOutput != nullptr) { - const IoHash& RawHash = RawHashes[Index]; - const uint64_t RawSize = RawSizes[Index]; - const uint32_t Attribute = Attributes[Index]; - ZEN_UNUSED(Attribute); - - ZEN_CONSOLE("{}\t{}\t{}", Path, RawSize, RawHash); + OptionalStructuredOutput->EndObject(); } } + if (OptionalStructuredOutput != nullptr) + { + OptionalStructuredOutput->EndArray(); // parts + } } } - void DiffFolders(TransferThreadWorkers& Workers, - const std::filesystem::path& BasePath, - const std::filesystem::path& ComparePath, - bool OnlyChunked) + void DiffFolders(TransferThreadWorkers& Workers, + const std::filesystem::path& BasePath, + const std::filesystem::path& ComparePath, + bool OnlyChunked, + const std::vector<std::string>& InExcludeFolders, + const std::vector<std::string>& InExcludeExtensions) { ZEN_TRACE_CPU("DiffFolders"); @@ -2104,7 +2182,7 @@ namespace { { StandardChunkingControllerSettings ChunkingSettings; std::unique_ptr<ChunkingController> ChunkController = CreateStandardChunkingController(ChunkingSettings); - std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions; + std::vector<std::string> ExcludeExtensions = InExcludeExtensions; if (OnlyChunked) { ExcludeExtensions.insert(ExcludeExtensions.end(), @@ -2115,7 +2193,7 @@ namespace { ChunkingSettings.SplitAndCompressExtensions.end()); } - auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { + auto IsAcceptedFolder = [ExcludeFolders = InExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) @@ -2404,6 +2482,25 @@ BuildsCommand::BuildsCommand() "<excludewildcard>"); }; + auto AddExcludeFolderOption = [this](cxxopts::Options& Ops) { + Ops.add_option("", + "", + "exclude-folders", + "Names of folders to exclude, separated by ;", + cxxopts::value(m_ExcludeFolders), + "<excludefolders>"); + }; + + auto AddExcludeExtensionsOption = [this](cxxopts::Options& Ops) { + Ops.add_option("", + "", + "exclude-extensions", + "Extensions to exclude, separated by ;" + "include filter", + cxxopts::value(m_ExcludeExtensions), + "<excludeextensions>"); + }; + auto AddMultipartOptions = [this](cxxopts::Options& Ops) { Ops.add_option("", "", @@ -2463,12 +2560,13 @@ BuildsCommand::BuildsCommand() "Enable fetch of buckets within namespaces also", cxxopts::value(m_ListNamespacesRecursive), "<recursive>"); - m_ListNamespacesOptions.add_option("", - "", - "result-path", - "Path to json or compactbinary to write query result to", - cxxopts::value(m_ListResultPath), - "<result-path>"); + m_ListNamespacesOptions.add_option( + "", + "", + "result-path", + "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console", + cxxopts::value(m_ListResultPath), + "<result-path>"); m_ListNamespacesOptions.parse_positional({"result-path"}); m_ListNamespacesOptions.positional_help("result-path"); @@ -2488,7 +2586,7 @@ BuildsCommand::BuildsCommand() m_ListOptions.add_option("", "", "result-path", - "Path to json or compactbinary to write query result to", + "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console", cxxopts::value(m_ListResultPath), "<result-path>"); m_ListOptions.parse_positional({"query-path", "result-path"}); @@ -2503,7 +2601,7 @@ BuildsCommand::BuildsCommand() m_ListBlocksOptions.add_option("", "", "result-path", - "Path to json or compactbinary to write query result to", + "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console", cxxopts::value(m_ListResultPath), "<result-path>"); @@ -2521,6 +2619,8 @@ BuildsCommand::BuildsCommand() AddCacheOptions(m_UploadOptions); AddWorkerOptions(m_UploadOptions); AddZenFolderOptions(m_UploadOptions); + AddExcludeFolderOption(m_UploadOptions); + AddExcludeExtensionsOption(m_UploadOptions); m_UploadOptions.add_options()("h,help", "Print help"); m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_UploadOptions.add_option("", @@ -2599,6 +2699,7 @@ BuildsCommand::BuildsCommand() AddWorkerOptions(m_DownloadOptions); AddWildcardOptions(m_DownloadOptions); AddAppendNewContentOptions(m_DownloadOptions); + AddExcludeFolderOption(m_DownloadOptions); m_DownloadOptions.add_option("cache", "", @@ -2691,12 +2792,28 @@ BuildsCommand::BuildsCommand() cxxopts::value(m_BuildPartNames), "<name>"); + m_LsOptions.add_option("", + "", + "result-path", + "Path to json (.json) or compactbinary (.cbo) to write output result to. Default is output to console", + cxxopts::value(m_LsResultPath), + "<result-path>"); + + m_LsOptions.add_option("", + "o", + "output-path", + "Path to output, extension .json or .cb (compac binary). Default is output to console", + cxxopts::value(m_LsResultPath), + "<output-path>"); + m_LsOptions.parse_positional({"build-id", "wildcard"}); m_LsOptions.positional_help("build-id wildcard"); // diff AddOutputOptions(m_DiffOptions); AddWorkerOptions(m_DiffOptions); + AddExcludeFolderOption(m_DiffOptions); + AddExcludeExtensionsOption(m_DiffOptions); m_DiffOptions.add_options()("h,help", "Print help"); m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), "<diff-path>"); @@ -3188,6 +3305,28 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) OutExcludeWildcards = SplitWildcard(m_ExcludeWildcard); }; + auto ParseExcludeFolderAndExtension = [&](std::vector<std::string>& OutExcludeFolders, std::vector<std::string>& OutExcludeExtensions) { + auto SplitExclusion = [](const std::string_view Input) -> std::vector<std::string> { + std::vector<std::string> Exclusions; + ForEachStrTok(Input, ";,", [&Exclusions](std::string_view Exclusion) { + if (!Exclusion.empty()) + { + std::string CleanExclusion(ToLower(Exclusion)); + if (CleanExclusion.length() > 2 && CleanExclusion.front() == '"' && CleanExclusion.back() == '"') + { + CleanExclusion = CleanExclusion.substr(1, CleanExclusion.length() - 2); + } + Exclusions.emplace_back(std::move(CleanExclusion)); + } + return true; + }); + return Exclusions; + }; + + OutExcludeFolders = SplitExclusion(m_ExcludeFolders); + OutExcludeExtensions = SplitExclusion(m_ExcludeExtensions); + }; + auto ParseDiffPath = [&]() { if (m_DiffPath.empty()) { @@ -3600,6 +3739,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path TempDir = ZenTempFolderPath(m_ZenFolderPath); + std::vector<std::string> ExcludeFolders = DefaultExcludeFolders; + std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions; + ParseExcludeFolderAndExtension(ExcludeFolders, ExcludeExtensions); + UploadFolder(*Output, Workers, Storage, @@ -3615,7 +3758,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MetaData, m_CreateBuild, m_Clean, - m_UploadToZenCache); + m_UploadToZenCache, + ExcludeFolders, + ExcludeExtensions); if (!AbortFlag) { @@ -3737,6 +3882,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw OptionParseException("'--append' conflicts with '--clean'", SubOption->help()); } + std::vector<std::string> ExcludeFolders = DefaultExcludeFolders; + std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions; + ParseExcludeFolderAndExtension(ExcludeFolders, ExcludeExtensions); + DownloadFolder(*Output, Workers, Storage, @@ -3759,7 +3908,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) .ExcludeWildcards = ExcludeWildcards, .MaximumInMemoryPayloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory), .PopulateCache = m_UploadToZenCache, - .AppendNewContent = m_AppendNewContent}); + .AppendNewContent = m_AppendNewContent, + .ExcludeFolders = ExcludeFolders}); if (AbortFlag) { @@ -3769,9 +3919,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_LsOptions) { - if (!IsQuiet) + if (!m_LsResultPath.empty()) { - LogExecutableVersionAndPid(); + if (!IsQuiet) + { + LogExecutableVersionAndPid(); + } } ZenState InstanceState; @@ -3801,7 +3954,30 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::vector<Oid> BuildPartIds = ParseBuildPartIds(); std::vector<std::string> BuildPartNames = ParseBuildPartNames(); - ListBuild(Storage, BuildId, BuildPartIds, BuildPartNames, IncludeWildcards, ExcludeWildcards); + std::unique_ptr<CbObjectWriter> StructuredOutput; + if (!m_LsResultPath.empty()) + { + MakeSafeAbsolutePathÍnPlace(m_LsResultPath); + StructuredOutput = std::make_unique<CbObjectWriter>(); + } + + ListBuild(Storage, BuildId, BuildPartIds, BuildPartNames, IncludeWildcards, ExcludeWildcards, StructuredOutput.get()); + + if (StructuredOutput) + { + CbObject Response = StructuredOutput->Save(); + if (ToLower(m_LsResultPath.extension().string()) == ".cbo") + { + MemoryView ResponseView = Response.GetView(); + WriteFile(m_LsResultPath, IoBuffer(IoBuffer::Wrap, ResponseView.GetData(), ResponseView.GetSize())); + } + else + { + ExtendableStringBuilder<1024> SB; + CompactBinaryToJson(Response.GetView(), SB); + WriteFile(m_LsResultPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); + } + } if (AbortFlag) { @@ -3825,7 +4001,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ParsePath(); ParseDiffPath(); - DiffFolders(Workers, m_Path, m_DiffPath, m_OnlyChunked); + std::vector<std::string> ExcludeFolders = DefaultExcludeFolders; + std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions; + ParseExcludeFolderAndExtension(ExcludeFolders, ExcludeExtensions); + + DiffFolders(Workers, m_Path, m_DiffPath, m_OnlyChunked, ExcludeFolders, ExcludeExtensions); if (AbortFlag) { throw std::runtime_error("Diff folders aborted"); @@ -4245,7 +4425,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MetaData, true, false, - m_UploadToZenCache); + m_UploadToZenCache, + DefaultExcludeFolders, + DefaultExcludeExtensions); if (AbortFlag) { throw std::runtime_error("Test aborted. (Upload build)"); @@ -4542,7 +4724,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MetaData2, true, false, - m_UploadToZenCache); + m_UploadToZenCache, + DefaultExcludeFolders, + DefaultExcludeExtensions); if (AbortFlag) { throw std::runtime_error("Test aborted. (Upload scrambled)"); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index 081a3a460..80c64c48d 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -90,6 +90,12 @@ private: std::filesystem::path m_Path; + std::string m_IncludeWildcard; + std::string m_ExcludeWildcard; + + std::string m_ExcludeFolders; + std::string m_ExcludeExtensions; + cxxopts::Options m_UploadOptions{"upload", "Upload a folder"}; uint64_t m_FindBlockMaxCount = 10000; bool m_PostUploadVerify = false; @@ -100,9 +106,8 @@ private: bool m_PostDownloadVerify = false; bool m_EnableScavenging = true; - cxxopts::Options m_LsOptions{"ls", "List the content of uploaded build"}; - std::string m_IncludeWildcard; - std::string m_ExcludeWildcard; + cxxopts::Options m_LsOptions{"ls", "List the content of uploaded build"}; + std::filesystem::path m_LsResultPath; cxxopts::Options m_DiffOptions{"diff", "Compare two local folders"}; std::filesystem::path m_DiffPath; diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 15fb1f16c..4885fd363 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -47,12 +47,13 @@ namespace { #define ZEN_CLOUD_STORAGE "Cloud Storage" - void WriteAuthOptions(CbObjectWriter& Writer, - std::string_view JupiterOpenIdProvider, - std::string_view JupiterAccessToken, - std::string_view JupiterAccessTokenEnv, - std::string_view JupiterAccessTokenPath, - std::string_view OidcTokenAuthExecutablePath) + void WriteAuthOptions(CbObjectWriter& Writer, + std::string_view JupiterOpenIdProvider, + std::string_view JupiterAccessToken, + std::string_view JupiterAccessTokenEnv, + std::string_view JupiterAccessTokenPath, + std::string_view OidcTokenAuthExecutablePath, + cxxopts::Options& Options) { if (!JupiterOpenIdProvider.empty()) { @@ -87,6 +88,11 @@ namespace { { Writer.AddString("oidc-exe-path"sv, OidcTokenExePath.generic_string()); } + else if (!OidcTokenAuthExecutablePath.empty()) + { + throw OptionParseException(fmt::format("'--oidctoken-exe-path' ('{}') does not exist", OidcTokenAuthExecutablePath), + Options.help()); + } } IoBuffer MakeCbObjectPayload(std::function<void(CbObjectWriter& Writer)> WriteCB) @@ -1234,7 +1240,8 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg m_JupiterAccessToken, m_JupiterAccessTokenEnv, m_JupiterAccessTokenPath, - m_OidcTokenAuthExecutablePath); + m_OidcTokenAuthExecutablePath, + m_Options); if (m_JupiterAssumeHttp2) { Writer.AddBool("assumehttp2"sv, true); @@ -1270,7 +1277,8 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg m_JupiterAccessToken, m_JupiterAccessTokenEnv, m_JupiterAccessTokenPath, - m_OidcTokenAuthExecutablePath); + m_OidcTokenAuthExecutablePath, + m_Options); if (m_JupiterAssumeHttp2) { Writer.AddBool("assumehttp2"sv, true); @@ -1664,7 +1672,8 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg m_JupiterAccessToken, m_JupiterAccessTokenEnv, m_JupiterAccessTokenPath, - m_OidcTokenAuthExecutablePath); + m_OidcTokenAuthExecutablePath, + m_Options); if (m_JupiterAssumeHttp2) { Writer.AddBool("assumehttp2"sv, true); @@ -1689,7 +1698,8 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg m_JupiterAccessToken, m_JupiterAccessTokenEnv, m_JupiterAccessTokenPath, - m_OidcTokenAuthExecutablePath); + m_OidcTokenAuthExecutablePath, + m_Options); if (m_JupiterAssumeHttp2) { Writer.AddBool("assumehttp2"sv, true); @@ -2634,8 +2644,6 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a BuildId, {.IsQuiet = m_Quiet, .IsVerbose = m_Verbose, .ForceDownload = m_ForceDownload, .TempFolderPath = StorageTempPath}); - const Oid OplogBuildPartId = State.GetBuildPartId(); - if (!m_Attachments.empty()) { if (m_AttachmentsPath.empty()) diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 4d4966222..c03ae476f 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -296,11 +296,15 @@ ZenCmdBase::LogExecutableVersionAndPid() int main(int argc, char** argv) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + zen::SetCurrentThreadName("main"); std::vector<std::string> Args; #if ZEN_PLATFORM_WINDOWS - LPWSTR RawCommandLine = GetCommandLine(); + LPWSTR RawCommandLine = GetCommandLineW(); std::string CommandLine = zen::WideToUtf8(RawCommandLine); Args = zen::ParseCommandLine(CommandLine); #else diff --git a/src/zencore-test/zencore-test.cpp b/src/zencore-test/zencore-test.cpp index 327550b32..68fc940ee 100644 --- a/src/zencore-test/zencore-test.cpp +++ b/src/zencore-test/zencore-test.cpp @@ -19,6 +19,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + #if ZEN_WITH_TESTS zen::zencore_forcelinktests(); diff --git a/src/zencore/commandline.cpp b/src/zencore/commandline.cpp index 78260aeef..426cf23d6 100644 --- a/src/zencore/commandline.cpp +++ b/src/zencore/commandline.cpp @@ -22,6 +22,10 @@ void IterateCommandlineArgs(std::function<void(const std::string_view& Arg)>& ProcessArg) { #if ZEN_PLATFORM_WINDOWS + // It might seem odd to do this here in addition to at start of main functions but the InitGMalloc() function is called before main (via + // static data) so we need to make sure we set the locale before parsing the command line + setlocale(LC_ALL, "en_us.UTF8"); + int ArgC = 0; const LPWSTR CmdLine = ::GetCommandLineW(); const LPWSTR* ArgV = ::CommandLineToArgvW(CmdLine, &ArgC); diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 7f341818b..4da412c17 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -2669,36 +2669,41 @@ GetDirectoryContent(const std::filesystem::path& RootDir, } if (EnumHasAnyFlags(Flags, DirectoryContentFlags::Recursive)) { - PendingWorkCount.AddCount(1); - try - { - WorkerPool.ScheduleWork( - [WorkerPool = &WorkerPool, - PendingWorkCount = &PendingWorkCount, - Visitor = Visitor, - Flags = Flags, - Path = std::move(Path), - RelativeRoot = RelativeRoot / DirectoryName]() { - ZEN_ASSERT(Visitor); - auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); - try - { - MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(Path, SubVisitor); - Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what()); - } - }, - WorkerThreadPool::EMode::DisableBacklog); - } - catch (const std::exception& Ex) + if (Visitor->AsyncAllowDirectory(Parent, DirectoryName)) { - ZEN_ERROR("Failed scheduling work to scan folder '{}'. Reason: '{}'", Path, Ex.what()); - PendingWorkCount.CountDown(); + PendingWorkCount.AddCount(1); + try + { + WorkerPool.ScheduleWork( + [WorkerPool = &WorkerPool, + PendingWorkCount = &PendingWorkCount, + Visitor = Visitor, + Flags = Flags, + Path = std::move(Path), + RelativeRoot = RelativeRoot / DirectoryName]() { + ZEN_ASSERT(Visitor); + auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); + try + { + MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(Path, SubVisitor); + Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", + Path / RelativeRoot, + Ex.what()); + } + }, + WorkerThreadPool::EMode::DisableBacklog); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed scheduling work to scan folder '{}'. Reason: '{}'", Path, Ex.what()); + PendingWorkCount.CountDown(); + } } } return false; diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index b4906aebf..938a05b59 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -368,6 +368,11 @@ public: std::vector<std::filesystem::path> DirectoryNames; std::vector<uint32_t> DirectoryAttributes; }; + virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const + { + ZEN_UNUSED(Parent, DirectoryName); + return true; + } virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) = 0; }; diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h index 93f8add0a..4379f2f80 100644 --- a/src/zencore/include/zencore/string.h +++ b/src/zencore/include/zencore/string.h @@ -821,19 +821,66 @@ ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func) while (It != End) { - if (*It == Delim) + std::string_view Remaining{It, size_t(ptrdiff_t(End - It))}; + size_t Idx = Remaining.find(Delim, 0); + if (Idx == 0) { It++; continue; } + size_t DelimSize = 0; + + if (Idx == std::string_view::npos) + { + Idx = Remaining.size(); + } + else + { + DelimSize = 1; + } + + Count++; + std::string_view Token{It, Idx}; + if (!Func(Token)) + { + break; + } + + It = It + (Idx + DelimSize); + } + + return Count; +} + +template<typename Fn> +uint32_t +ForEachStrTok(const std::string_view& Str, const char* Delims, Fn&& Func) +{ + const char* It = Str.data(); + const char* End = It + Str.length(); + uint32_t Count = 0; + + while (It != End) + { std::string_view Remaining{It, size_t(ptrdiff_t(End - It))}; - size_t Idx = Remaining.find(Delim, 0); + size_t Idx = Remaining.find_first_of(Delims, 0); + if (Idx == 0) + { + It++; + continue; + } + + size_t DelimSize = 0; if (Idx == std::string_view::npos) { Idx = Remaining.size(); } + else + { + DelimSize = 1; + } Count++; std::string_view Token{It, Idx}; @@ -842,7 +889,7 @@ ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func) break; } - It = It + Idx; + It = It + (Idx + DelimSize); } return Count; diff --git a/src/zencore/string.cpp b/src/zencore/string.cpp index c8c7c2cde..0ee863b74 100644 --- a/src/zencore/string.cpp +++ b/src/zencore/string.cpp @@ -1067,6 +1067,42 @@ TEST_CASE("string") TokenCount = ForEachStrTok(""sv, ',', [](const std::string_view&) { return true; }); CHECK(TokenCount == ExpectedTokenCount); } + + SUBCASE("ForEachStrTok2") + { + const auto Tokens = "here,is;my,different tokens"sv; + const auto ExpectedTokens = "here,is,my,different,tokens"sv; + int32_t ExpectedTokenCount = 5; + int32_t TokenCount = 0; + StringBuilder<512> Sb; + + TokenCount = ForEachStrTok(Tokens, " ,;", [&Sb](const std::string_view& Token) { + if (Sb.Size()) + { + Sb << ","; + } + Sb << Token; + return true; + }); + + CHECK(TokenCount == ExpectedTokenCount); + CHECK(Sb.ToString() == ExpectedTokens); + + ExpectedTokenCount = 1; + const auto Str = "mosdef"sv; + + Sb.Reset(); + TokenCount = ForEachStrTok(Str, " ,;", [&Sb](const std::string_view& Token) { + Sb << Token; + return true; + }); + CHECK(Sb.ToString() == Str); + CHECK(TokenCount == ExpectedTokenCount); + + ExpectedTokenCount = 0; + TokenCount = ForEachStrTok(""sv, " ,;", [](const std::string_view&) { return true; }); + CHECK(TokenCount == ExpectedTokenCount); + } } #endif diff --git a/src/zenhttp-test/zenhttp-test.cpp b/src/zenhttp-test/zenhttp-test.cpp index d18b2167e..c18759beb 100644 --- a/src/zenhttp-test/zenhttp-test.cpp +++ b/src/zenhttp-test/zenhttp-test.cpp @@ -15,6 +15,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + #if ZEN_WITH_TESTS zen::zenhttp_forcelinktests(); diff --git a/src/zenhttp/clients/httpclientcpr.cpp b/src/zenhttp/clients/httpclientcpr.cpp index 987cdd706..5d92b3b6b 100644 --- a/src/zenhttp/clients/httpclientcpr.cpp +++ b/src/zenhttp/clients/httpclientcpr.cpp @@ -163,7 +163,7 @@ CprHttpClient::~CprHttpClient() HttpClient::Response CprHttpClient::ResponseWithPayload(std::string_view SessionId, - cpr::Response& HttpResponse, + cpr::Response&& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { @@ -235,11 +235,7 @@ CprHttpClient::CommonResponse(std::string_view SessionId, cpr::Response&& HttpRe } else { - return ResponseWithPayload( - SessionId, - HttpResponse, - WorkResponseCode, - Payload ? std::move(Payload) : IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size())); + return ResponseWithPayload(SessionId, std::move(HttpResponse), WorkResponseCode, std::move(Payload)); } } diff --git a/src/zenhttp/clients/httpclientcpr.h b/src/zenhttp/clients/httpclientcpr.h index 94d57fb43..40af53b5d 100644 --- a/src/zenhttp/clients/httpclientcpr.h +++ b/src/zenhttp/clients/httpclientcpr.h @@ -160,7 +160,7 @@ private: HttpClient::Response CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload); HttpClient::Response ResponseWithPayload(std::string_view SessionId, - cpr::Response& HttpResponse, + cpr::Response&& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload); }; diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index a0431b0b1..be4e73576 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -4,6 +4,7 @@ #include "httptracer.h" #include <zencore/except.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory/llm.h> #include <zencore/thread.h> @@ -13,6 +14,8 @@ #include "httpparser.h" +#include <EASTL/fixed_vector.h> + #include <deque> #include <memory> #include <string_view> @@ -28,6 +31,7 @@ ZEN_THIRD_PARTY_INCLUDES_START # include <errno.h> #endif #include <asio.hpp> +#include <asio/stream_file.hpp> ZEN_THIRD_PARTY_INCLUDES_END #define ASIO_VERBOSE_TRACE 0 @@ -154,6 +158,345 @@ Log() ////////////////////////////////////////////////////////////////////////// +#if !defined(ASIO_HAS_FILE) +# define ASIO_HAS_FILE 0 +#endif + +#if defined(ASIO_HAS_WINDOWS_OVERLAPPED_PTR) +# define ZEN_USE_TRANSMITFILE 1 +# define ZEN_USE_ASYNC_SENDFILE ASIO_HAS_FILE +#else +# define ZEN_USE_TRANSMITFILE 0 +# define ZEN_USE_ASYNC_SENDFILE 0 +#endif + +#if ZEN_USE_TRANSMITFILE +template<typename Handler> +void +TransmitFileAsync(asio::ip::tcp::socket& Socket, HANDLE FileHandle, uint64_t ByteOffset, uint32_t ByteSize, Handler&& Cb) +{ + // We need to establish a new handle here to avoid running into random errors + // during TransmitFile. I'm not entirely sure why it's necessary yet. + + HANDLE hReopenedFile = + ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED); + + const uint64_t FileSize = FileSizeFromHandle(FileHandle); + const uint64_t SendEndOffset = ByteOffset + ByteSize; + + if (SendEndOffset > FileSize) + { + std::error_code DummyEc; + + ZEN_WARN("TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) tries to transmit {} bytes too many", + ByteOffset, + ByteSize, + PathFromHandle(FileHandle, DummyEc), + FileSizeFromHandle(FileHandle), + SendEndOffset - FileSize); + } + + asio::windows::overlapped_ptr OverlappedPtr( + Socket.get_executor(), + [WrappedCb = std::move(Cb), ExpectedBytes = ByteSize, FileHandle, ByteOffset, ByteSize, hReopenedFile]( + const std::error_code& Ec, + std::size_t ActualBytesTransferred) { + if (Ec) + { + std::error_code DummyEc; + ZEN_WARN("NOTE: TransmitFileAsync (offset {:#x}, size {:#x}) file: '{}' (size {:#x})) error '{}', transmitted {} bytes", + ByteOffset, + ByteSize, + PathFromHandle(hReopenedFile, DummyEc), + FileSizeFromHandle(hReopenedFile), + Ec.message(), + ActualBytesTransferred); + } + + CloseHandle(hReopenedFile); + WrappedCb(Ec, ActualBytesTransferred); + }); + + OVERLAPPED* RawOverlapped = OverlappedPtr.get(); + RawOverlapped->Offset = uint32_t(ByteOffset & 0xffffFFFFull); + RawOverlapped->OffsetHigh = uint32_t(ByteOffset >> 32); + + const DWORD NumberOfBytesPerSend = 0; // let TransmitFile decide + + const BOOL Ok = + ::TransmitFile(Socket.native_handle(), hReopenedFile, ByteSize, NumberOfBytesPerSend, RawOverlapped, nullptr, /* dwReserved */ 0); + const DWORD LastError = ::GetLastError(); + + // Check if the operation completed immediately. + if (!Ok && LastError != ERROR_IO_PENDING) + { + // The operation completed immediately, so a completion notification needs + // to be posted. When complete() is called, ownership of the OVERLAPPED- + // derived object passes to the io_context. + + asio::error_code ec(LastError, asio::error::get_system_category()); + OverlappedPtr.complete(ec, 0); + } + else + { + // The operation was successfully initiated, so ownership of the + // OVERLAPPED-derived object has passed to the io_context. + + OverlappedPtr.release(); + } +} +#endif // ZEN_USE_TRANSMITFILE + +#if ZEN_USE_ASYNC_SENDFILE + +// Pipelined file sender that reads from a file and writes to a socket using two buffers +// to pipeline reads and writes. Unfortunately this strategy can't currently be used on +// non-Windows platforms as they don't currently support async file reads. We'll have +// to build a mechanism using a thread pool for that, perhaps with optional support +// for io_uring where available since that should be the most efficient. +// +// In other words, this is not super useful as Windows already has the TransmitFile +// version above, but it's here for completeness and potential future use on other platforms. + +template<class AsyncWriteStream, class CompletionHandler> +class PipelinedFileSender : public std::enable_shared_from_this<PipelinedFileSender<AsyncWriteStream, CompletionHandler>> +{ +public: + PipelinedFileSender(AsyncWriteStream& WriteSocket, + asio::stream_file&& FileToReadFrom, + uint64_t ByteSize, + std::size_t BufferSize, + CompletionHandler&& CompletionToken) + : m_WriteSocket(WriteSocket) + , m_SourceFile(std::move(FileToReadFrom)) + , m_Strand(asio::make_strand(m_WriteSocket.get_executor())) + , m_Buffers{std::vector<char>(BufferSize), std::vector<char>(BufferSize)} + , m_CompletionHandler(std::move(CompletionToken)) + , m_TotalBytesToRead(ByteSize) + , m_RemainingBytesToRead(ByteSize) + { + } + + void Start() { EnqueueRead(); } + +private: + struct Slot + { + std::size_t Size = 0; // valid bytes in buffer + bool Ready = false; // has unwritten data + bool InUse = false; // being written + }; + + void OnSendCompleted(const std::error_code& Ec) + { + // TODO: ensure this behaves properly for instance if the write fails while a read is pending + + if (m_SendCompleted) + { + return; + } + + m_SendCompleted = true; + + // Ensure completion runs on the strand/executor. + + asio::dispatch(m_Strand, [CompletionHandler = std::move(m_CompletionHandler), Ec, TotalBytesWritten = m_TotalBytesWritten]() { + CompletionHandler(Ec, TotalBytesWritten); + }); + } + + void EnqueueRead() + { + asio::dispatch(m_Strand, [self = this->shared_from_this()] { + self->TryPostRead(); + self->PumpWrites(); + }); + } + + void TryPostRead() + { + if (m_IsEof || m_ReadInFlight || m_RemainingBytesToRead == 0) + { + return; + } + + const int ReadSlotIndex = GetFreeSlotIndex(); + if (ReadSlotIndex < 0) + { + // no free slot; wait for writer to free one (not meant to ever happen) + return; + } + + m_ReadInFlight = true; + auto ReadBuffer = asio::buffer(m_Buffers[ReadSlotIndex].data(), zen::Min(m_Buffers[ReadSlotIndex].size(), m_RemainingBytesToRead)); + + asio::async_read( + m_SourceFile, + ReadBuffer, + asio::bind_executor(m_Strand, + [self = this->shared_from_this(), ReadSlotIndex](const std::error_code& Ec, std::size_t ActualBytesRead) { + self->m_ReadInFlight = false; + self->m_RemainingBytesToRead -= ActualBytesRead; + + if (Ec) + { + if (Ec == asio::error::eof) + { + ZEN_ASSERT(self->m_RemainingBytesToRead == 0); + + self->m_IsEof = true; + + // No data produced on EOF; just try to pump whatever is left + self->PumpWrites(); + } + else + { + // read error, cancel everything and let outer completion handler know + self->OnSendCompleted(Ec); + } + } + else + { + // Mark slot as ready with ActualBytesRead valid bytes of data in buffer + self->m_Slots[ReadSlotIndex].Size = ActualBytesRead; + self->m_Slots[ReadSlotIndex].Ready = true; + self->PumpWrites(); + self->TryPostRead(); + } + })); + } + + void PumpWrites() + { + if (m_WriteInFlight) + { + return; + } + + const int WriteSlotIndex = GetReadySlotIndex(); + if (WriteSlotIndex < 0) + { + // No ready data. We're done if EOF/no more data to read and no reads in flight and nothing ready + if (!m_ReadInFlight && (m_IsEof || m_RemainingBytesToRead == 0)) + { + // all done + return OnSendCompleted({}); + } + + return; + } + + m_WriteInFlight = true; + m_Slots[WriteSlotIndex].InUse = true; + + asio::async_write( + m_WriteSocket, + asio::buffer(m_Buffers[WriteSlotIndex].data(), m_Slots[WriteSlotIndex].Size), + asio::bind_executor(m_Strand, + [self = this->shared_from_this(), WriteSlotIndex](const std::error_code& Ec, std::size_t BytesWritten) { + self->m_TotalBytesWritten += BytesWritten; + self->m_WriteInFlight = false; + + if (Ec) + { + self->OnSendCompleted(Ec); + + return; + } + else + { + // Free the slot + self->m_Slots[WriteSlotIndex].Ready = false; + self->m_Slots[WriteSlotIndex].InUse = false; + self->m_Slots[WriteSlotIndex].Size = 0; + + self->TryPostRead(); + self->PumpWrites(); + } + })); + } + + int GetFreeSlotIndex() const + { + for (int i = 0; i < 2; ++i) + { + if (!m_Slots[i].Ready && !m_Slots[i].InUse) + { + return i; + } + } + return -1; + } + + int GetReadySlotIndex() const + { + for (int i = 0; i < 2; ++i) + { + if (m_Slots[i].Ready && !m_Slots[i].InUse) + { + return i; + } + } + return -1; + } + + AsyncWriteStream& m_WriteSocket; + asio::stream_file m_SourceFile; + asio::strand<asio::any_io_executor> m_Strand; + + // There's no synchronization needed for these as all access is via the strand + std::vector<char> m_Buffers[2]; + Slot m_Slots[2]; + + bool m_IsEof = false; + bool m_ReadInFlight = false; + bool m_WriteInFlight = false; + bool m_SendCompleted = false; + + const uint64_t m_TotalBytesToRead = 0; + uint64_t m_RemainingBytesToRead = 0; + uint64_t m_TotalBytesWritten = 0; + + CompletionHandler m_CompletionHandler; +}; + +template<class AsyncWriteStream, class CompletionHandler> +void +SendFileAsync(AsyncWriteStream& WriteSocket, + const auto FileHandle, + uint64_t ByteOffset, + uint64_t ByteSize, + std::size_t BufferSize, + CompletionHandler&& CompletionToken) +{ + HANDLE hReopenedFile = + ReOpenFile(FileHandle, FILE_GENERIC_READ, FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, FILE_FLAG_OVERLAPPED); + + // Note that this assumes ownership of the handle + asio::stream_file SourceFile(WriteSocket.get_executor(), hReopenedFile); + + // TODO: handle any error properly here + SourceFile.seek(ByteOffset, asio::stream_file::seek_set); + + if (BufferSize > ByteSize) + { + BufferSize = ByteSize; + } + + auto op = std::make_shared<PipelinedFileSender<AsyncWriteStream, std::decay_t<CompletionHandler>>>( + WriteSocket, + std::move(SourceFile), + ByteSize, + BufferSize, + std::forward<CompletionHandler>(CompletionToken)); + + // Start the pipeline + op->Start(); +} +#endif // ZEN_USE_ASYNC_SENDFILE + +////////////////////////////////////////////////////////////////////////// + struct HttpAsioServerImpl { public: @@ -191,7 +534,7 @@ public: class HttpAsioServerRequest : public HttpServerRequest { public: - HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer); + HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer, uint32_t RequestNumber); ~HttpAsioServerRequest(); virtual Oid ParseSessionId() const override; @@ -210,6 +553,7 @@ public: HttpAsioServerRequest& operator=(const HttpAsioServerRequest&) = delete; HttpRequestParser& m_Request; + uint32_t m_RequestNumber = 0; // Note: different to request ID which is derived from headers IoBuffer m_PayloadBuffer; std::unique_ptr<HttpResponse> m_Response; }; @@ -218,7 +562,11 @@ struct HttpResponse { public: HttpResponse() = default; - explicit HttpResponse(HttpContentType ContentType) : m_ContentType(ContentType) {} + explicit HttpResponse(HttpContentType ContentType, uint32_t RequestNumber) : m_RequestNumber(RequestNumber), m_ContentType(ContentType) + { + } + + ~HttpResponse() = default; void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> BlobList) { @@ -230,57 +578,61 @@ public: const uint32_t ChunkCount = gsl::narrow<uint32_t>(BlobList.size()); m_DataBuffers.reserve(ChunkCount); + m_IoVecCount = ChunkCount + 1 /* one extra buffer for headers */; + m_IoVecs.resize(m_IoVecCount); - for (IoBuffer& Buffer : BlobList) - { -#if 1 - m_DataBuffers.emplace_back(std::move(Buffer)).MakeOwned(); -#else - IoBuffer TempBuffer = std::move(Buffer); - TempBuffer.MakeOwned(); - m_DataBuffers.emplace_back(IoBufferBuilder::ReadFromFileMaybe(TempBuffer)); -#endif - } + m_IoVecCursor = 0; uint64_t LocalDataSize = 0; + int Index = 1; - m_AsioBuffers.push_back({}); // Placeholder for header - - for (IoBuffer& Buffer : m_DataBuffers) + for (IoBuffer& Buffer : BlobList) { - uint64_t BufferDataSize = Buffer.Size(); + const uint64_t BufferDataSize = Buffer.Size(); ZEN_ASSERT(BufferDataSize); LocalDataSize += BufferDataSize; - IoBufferFileReference FileRef; - if (Buffer.GetFileReference(/* out */ FileRef)) - { - // TODO: Use direct file transfer, via TransmitFile/sendfile - // - // this looks like it requires some custom asio plumbing however + IoBuffer OwnedBuffer = std::move(Buffer); + OwnedBuffer.MakeOwned(); - m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); - } - else + IoVec& Io = m_IoVecs[Index++]; + + bool ChunkHandled = false; + +#if ZEN_USE_TRANSMITFILE || ZEN_USE_ASYNC_SENDFILE + if (IoBufferFileReference FileRef; OwnedBuffer.GetFileReference(/* out */ FileRef)) { - // Send from memory + Io.IsFileRef = true; + Io.Ref.FileRef = FileRef; + ChunkHandled = true; + } +#endif - m_AsioBuffers.push_back({Buffer.Data(), Buffer.Size()}); + if (!ChunkHandled) + { + Io.IsFileRef = false; + uint32_t Size = gsl::narrow<uint32_t>(OwnedBuffer.Size()); + Io.Ref.MemoryRef = {OwnedBuffer.Data(), Size}; } + + m_DataBuffers.emplace_back(OwnedBuffer); } + m_ContentLength = LocalDataSize; std::string_view Headers = GetHeaders(); - m_AsioBuffers[0] = asio::const_buffer(Headers.data(), Headers.size()); + + IoVec& Io = m_IoVecs[0]; + + Io.IsFileRef = false; + Io.Ref.MemoryRef = {.Data = Headers.data(), .Size = gsl::narrow_cast<uint32_t>(Headers.size())}; } uint16_t ResponseCode() const { return m_ResponseCode; } uint64_t ContentLength() const { return m_ContentLength; } - const std::vector<asio::const_buffer>& AsioBuffers() const { return m_AsioBuffers; } - std::string_view GetHeaders() { ZEN_MEMSCOPE(GetHttpasioTag()); @@ -299,16 +651,137 @@ public: return m_Headers; } - void SuppressPayload() { m_AsioBuffers.resize(1); } + void SendResponse(asio::ip::tcp::socket& TcpSocket, std::function<void(const asio::error_code& Ec, std::size_t ByteCount)>&& Token) + { + m_SendCb = std::move(Token); + + SendNextChunk(TcpSocket); + } + + void SendNextChunk(asio::ip::tcp::socket& TcpSocket) + { + if (m_IoVecCursor == m_IoVecCount) + { + ZEN_ASSERT(m_SendCb); + + auto CompletionToken = [Self = this, Token = std::move(m_SendCb), TotalBytes = m_TotalBytesSent] { Token({}, TotalBytes); }; + + asio::defer(TcpSocket.get_executor(), std::move(CompletionToken)); + + return; + } + + const IoVec& Io = m_IoVecs[m_IoVecCursor++]; + + if (Io.IsFileRef) + { + ZEN_TRACE_VERBOSE("SendNextChunk from FILE, thread: {}, bytes: {}", zen::GetCurrentThreadId(), Io.Ref.FileRef.FileChunkSize); + +#if ZEN_USE_TRANSMITFILE + TransmitFileAsync(TcpSocket, + Io.Ref.FileRef.FileHandle, + Io.Ref.FileRef.FileChunkOffset, + gsl::narrow_cast<uint32_t>(Io.Ref.FileRef.FileChunkSize), + [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }); +#elif ZEN_USE_ASYNC_SENDFILE + SendFileAsync(TcpSocket, + Io.Ref.FileRef.FileHandle, + Io.Ref.FileRef.FileChunkOffset, + Io.Ref.FileRef.FileChunkSize, + 64 * 1024, + [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }); +#else + // This should never occur unless we compile with one + // of the options above + ZEN_ASSERT("invalid file reference in response"); +#endif + + return; + } + + // Send as many consecutive non-file references as possible in one asio operation + + std::vector<asio::const_buffer> AsioBuffers; + AsioBuffers.push_back(asio::const_buffer{Io.Ref.MemoryRef.Data, Io.Ref.MemoryRef.Size}); + + while (m_IoVecCursor != m_IoVecCount) + { + const IoVec& Io2 = m_IoVecs[m_IoVecCursor]; + + if (Io2.IsFileRef) + { + break; + } + + AsioBuffers.push_back(asio::const_buffer{Io2.Ref.MemoryRef.Data, Io2.Ref.MemoryRef.Size}); + ++m_IoVecCursor; + } + + asio::async_write(TcpSocket, + std::move(AsioBuffers), + asio::transfer_all(), + [this, &TcpSocket](const asio::error_code& Ec, std::size_t ByteCount) { + m_TotalBytesSent += ByteCount; + if (Ec) + { + m_SendCb(Ec, m_TotalBytesSent); + } + else + { + SendNextChunk(TcpSocket); + } + }); + } private: - uint16_t m_ResponseCode = 0; - bool m_IsKeepAlive = true; - HttpContentType m_ContentType = HttpContentType::kBinary; - uint64_t m_ContentLength = 0; - std::vector<IoBuffer> m_DataBuffers; - std::vector<asio::const_buffer> m_AsioBuffers; - ExtendableStringBuilder<160> m_Headers; + uint32_t m_RequestNumber = 0; + uint16_t m_ResponseCode = 0; + bool m_IsKeepAlive = true; + HttpContentType m_ContentType = HttpContentType::kBinary; + uint64_t m_ContentLength = 0; + eastl::fixed_vector<IoBuffer, 8> m_DataBuffers; // This is here to keep the IoBuffer buffers/handles alive + ExtendableStringBuilder<160> m_Headers; + + struct IoVec + { + bool IsFileRef; + union + { + struct MemoryBuffer + { + const void* Data; + uint32_t Size; + } MemoryRef; + IoBufferFileReference FileRef; + } Ref; + }; + + eastl::fixed_vector<IoVec, 8> m_IoVecs; + int m_IoVecCursor = 0; + int m_IoVecCount = 0; + + std::function<void(const asio::error_code& Ec, std::size_t ByteCount)> m_SendCb; + uint64_t m_TotalBytesSent = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -339,37 +812,63 @@ private: kTerminated }; + const char* StateToString(RequestState State) + { + switch (State) + { + case RequestState::kInitialState: + return "InitialState"; + case RequestState::kInitialRead: + return "InitialRead"; + case RequestState::kReadingMore: + return "ReadingMore"; + case RequestState::kWriting: + return "Writing"; + case RequestState::kWritingFinal: + return "WritingFinal"; + case RequestState::kDone: + return "Done"; + case RequestState::kTerminated: + return "Terminated"; + default: + return "Unknown"; + } + } + RequestState m_RequestState = RequestState::kInitialState; HttpRequestParser m_RequestData{*this}; void EnqueueRead(); void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); - void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false); + void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, uint32_t RequestNumber, HttpResponse* ResponseToPop); void CloseConnection(); - HttpAsioServerImpl& m_Server; - asio::streambuf m_RequestBuffer; - std::unique_ptr<asio::ip::tcp::socket> m_Socket; - std::atomic<uint32_t> m_RequestCounter{0}; - uint32_t m_ConnectionId = 0; - Ref<IHttpPackageHandler> m_PackageHandler; + HttpAsioServerImpl& m_Server; + asio::streambuf m_RequestBuffer; + std::atomic<uint32_t> m_RequestCounter{0}; + uint32_t m_ConnectionId = 0; + Ref<IHttpPackageHandler> m_PackageHandler; - RwLock m_ResponsesLock; - std::deque<std::unique_ptr<HttpResponse>> m_Responses; + RwLock m_ActiveResponsesLock; + std::deque<std::unique_ptr<HttpResponse>> m_ActiveResponses; + + std::unique_ptr<asio::ip::tcp::socket> m_Socket; }; std::atomic<uint32_t> g_ConnectionIdCounter{0}; HttpServerConnection::HttpServerConnection(HttpAsioServerImpl& Server, std::unique_ptr<asio::ip::tcp::socket>&& Socket) : m_Server(Server) -, m_Socket(std::move(Socket)) , m_ConnectionId(g_ConnectionIdCounter.fetch_add(1)) +, m_Socket(std::move(Socket)) { ZEN_TRACE_VERBOSE("new connection #{}", m_ConnectionId); } HttpServerConnection::~HttpServerConnection() { + RwLock::ExclusiveLockScope _(m_ActiveResponsesLock); + ZEN_TRACE_VERBOSE("destroying connection #{}", m_ConnectionId); } @@ -434,7 +933,11 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] return; default: - ZEN_WARN("on data received ERROR, connection: {}, reason '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data received ERROR, connection: {} (state: {}), reason '{}'", + m_ConnectionId, + StateToString(m_RequestState), + Ec.message()); + return TerminateConnection(); } } @@ -472,37 +975,58 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] } void -HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unused]] std::size_t ByteCount, bool Pop) +HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, + [[maybe_unused]] std::size_t ByteCount, + [[maybe_unused]] uint32_t RequestNumber, + HttpResponse* ResponseToPop) { ZEN_MEMSCOPE(GetHttpasioTag()); if (Ec) { - ZEN_WARN("on data sent ERROR, connection: {}, reason: '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data sent ERROR, connection: {} (state: {}), reason: '{}' (bytes: {})", + m_ConnectionId, + StateToString(m_RequestState), + Ec.message(), + ByteCount); + TerminateConnection(); + + return; } - else - { - ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", - m_ConnectionId, - m_RequestCounter.load(std::memory_order_relaxed), - zen::GetCurrentThreadId(), - NiceBytes(ByteCount)); - if (!m_RequestData.IsKeepAlive()) - { - CloseConnection(); - } - else - { - if (Pop) + ZEN_TRACE_VERBOSE("on data sent, connection: {}, request: {}, thread: {}, bytes: {}", + m_ConnectionId, + RequestNumber, + zen::GetCurrentThreadId(), + NiceBytes(ByteCount)); + + if (ResponseToPop) + { + m_ActiveResponsesLock.WithExclusiveLock([&] { + // Once a response is sent we can release any referenced resources + // + // completion callbacks may be issued out-of-order so we need to + // remove the relevant entry from our active response list, it may + // not be the first + + if (auto It = find_if(begin(m_ActiveResponses), + end(m_ActiveResponses), + [ResponseToPop](const auto& Item) { return Item.get() == ResponseToPop; }); + It != end(m_ActiveResponses)) + { + m_ActiveResponses.erase(It); + } + else { - RwLock::ExclusiveLockScope _(m_ResponsesLock); - m_Responses.pop_front(); + ZEN_WARN("response not found"); } + }); + } - m_RequestCounter.fetch_add(1); - } + if (!m_RequestData.IsKeepAlive()) + { + CloseConnection(); } } @@ -553,13 +1077,13 @@ HttpServerConnection::HandleRequest() m_RequestState = RequestState::kWriting; } + const uint32_t RequestNumber = m_RequestCounter.fetch_add(1); + if (HttpService* Service = m_Server.RouteRequest(m_RequestData.Url())) { ZEN_TRACE_CPU("asio::HandleRequest"); - const uint32_t RequestNumber = m_RequestCounter.load(std::memory_order_relaxed); - - HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body()); + HttpAsioServerRequest Request(m_RequestData, *Service, m_RequestData.Body(), RequestNumber); ZEN_TRACE_VERBOSE("handle request, connection: {}, request: {}'", m_ConnectionId, RequestNumber); @@ -635,34 +1159,38 @@ HttpServerConnection::HandleRequest() if (m_RequestData.RequestVerb() == HttpVerb::kHead) { - Response->SuppressPayload(); - } + ZEN_TRACE_CPU("asio::async_write"); - const std::vector<asio::const_buffer>& ResponseBuffers = Response->AsioBuffers(); + std::string_view Headers = Response->GetHeaders(); - uint64_t ResponseLength = 0; + std::vector<asio::const_buffer> AsioBuffers; + AsioBuffers.push_back(asio::const_buffer(Headers.data(), Headers.size())); - for (const asio::const_buffer& Buffer : ResponseBuffers) - { - ResponseLength += Buffer.size(); + asio::async_write(*m_Socket.get(), + AsioBuffers, + asio::transfer_all(), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } - + else { - RwLock::ExclusiveLockScope _(m_ResponsesLock); - m_Responses.push_back(std::move(Response)); - } + ZEN_TRACE_CPU("asio::async_write"); - // TODO: should cork/uncork for Linux? + HttpResponse* ResponseRaw = Response.get(); - { - ZEN_TRACE_CPU("asio::async_write"); - asio::async_write(*m_Socket.get(), - ResponseBuffers, - asio::transfer_exactly(ResponseLength), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { - Conn->OnResponseDataSent(Ec, ByteCount, true); - }); + m_ActiveResponsesLock.WithExclusiveLock([&] { + // Keep referenced resources alive + m_ActiveResponses.push_back(std::move(Response)); + }); + + ResponseRaw->SendResponse( + *m_Socket, + [Conn = AsSharedPtr(), ResponseRaw, RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ ResponseRaw); + }); } + return; } } @@ -681,10 +1209,11 @@ HttpServerConnection::HandleRequest() "\r\n"sv; } - asio::async_write( - *m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); + asio::async_write(*m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } else { @@ -706,10 +1235,11 @@ HttpServerConnection::HandleRequest() "No suitable route found"sv; } - asio::async_write( - *m_Socket.get(), - asio::buffer(Response), - [Conn = AsSharedPtr()](const asio::error_code& Ec, std::size_t ByteCount) { Conn->OnResponseDataSent(Ec, ByteCount); }); + asio::async_write(*m_Socket.get(), + asio::buffer(Response), + [Conn = AsSharedPtr(), RequestNumber](const asio::error_code& Ec, std::size_t ByteCount) { + Conn->OnResponseDataSent(Ec, ByteCount, RequestNumber, /* ResponseToPop */ nullptr); + }); } } @@ -1016,9 +1546,13 @@ private: ////////////////////////////////////////////////////////////////////////// -HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, HttpService& Service, IoBuffer PayloadBuffer) +HttpAsioServerRequest::HttpAsioServerRequest(HttpRequestParser& Request, + HttpService& Service, + IoBuffer PayloadBuffer, + uint32_t RequestNumber) : HttpServerRequest(Service) , m_Request(Request) +, m_RequestNumber(RequestNumber) , m_PayloadBuffer(std::move(PayloadBuffer)) { const int PrefixLength = Service.UriPrefixLength(); @@ -1104,7 +1638,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode) ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(HttpContentType::kBinary)); + m_Response.reset(new HttpResponse(HttpContentType::kBinary, m_RequestNumber)); std::array<IoBuffer, 0> Empty; m_Response->InitializeForPayload((uint16_t)ResponseCode, Empty); @@ -1117,7 +1651,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(ContentType)); + m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); m_Response->InitializeForPayload((uint16_t)ResponseCode, Blobs); } @@ -1127,7 +1661,7 @@ HttpAsioServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentT ZEN_MEMSCOPE(GetHttpasioTag()); ZEN_ASSERT(!m_Response); - m_Response.reset(new HttpResponse(ContentType)); + m_Response.reset(new HttpResponse(ContentType, m_RequestNumber)); IoBuffer MessageBuffer(IoBuffer::Wrap, ResponseString.data(), ResponseString.size()); std::array<IoBuffer, 1> SingleBufferList({MessageBuffer}); diff --git a/src/zennet-test/zennet-test.cpp b/src/zennet-test/zennet-test.cpp index 5e4d29220..bc3b8e8e9 100644 --- a/src/zennet-test/zennet-test.cpp +++ b/src/zennet-test/zennet-test.cpp @@ -16,6 +16,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char** argv) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + #if ZEN_WITH_TESTS zen::zennet_forcelinktests(); diff --git a/src/zenremotestore-test/zenremotestore-test.cpp b/src/zenremotestore-test/zenremotestore-test.cpp index a49c6d273..5db185041 100644 --- a/src/zenremotestore-test/zenremotestore-test.cpp +++ b/src/zenremotestore-test/zenremotestore-test.cpp @@ -17,6 +17,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + #if ZEN_WITH_TESTS zen::zenremotestore_forcelinktests(); diff --git a/src/zenremotestore/builds/buildsavedstate.cpp b/src/zenremotestore/builds/buildsavedstate.cpp index 5a86ee865..1d1f4605f 100644 --- a/src/zenremotestore/builds/buildsavedstate.cpp +++ b/src/zenremotestore/builds/buildsavedstate.cpp @@ -207,9 +207,14 @@ ReadBuildSaveStateFile(const std::filesystem::path& StateFilePath) { ZEN_TRACE_CPU("ReadStateFile"); - FileContents FileData = ReadFile(StateFilePath); + IoBuffer DataBuffer; + { + BasicFile Source(StateFilePath, BasicFile::Mode::kRead); + DataBuffer = Source.ReadAll(); + } + CbValidateError ValidateError; - if (CbObject CurrentStateObject = ValidateAndReadCompactBinaryObject(FileData.Flatten(), ValidateError); + if (CbObject CurrentStateObject = ValidateAndReadCompactBinaryObject(std::move(DataBuffer), ValidateError); ValidateError == CbValidateError::None) { if (CurrentStateObject) diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index b9f5eb07a..3ca2f72c1 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -782,78 +782,17 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Stopwatch LocalTimer; - for (uint32_t LocalSequenceIndex = 0; - LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); - LocalSequenceIndex++) - { - const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; - const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; - - { - uint64_t SourceOffset = 0; - const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; - for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) - { - const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; - const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - - if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); - RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = RemoteChunkIt->second; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + ScavengeSourceForChunks(RemainingChunkCount, + RemoteChunkIndexNeedsCopyFromLocalFileFlags, + RawHashToCopyChunkDataIndex, + SequenceIndexChunksLeftToWriteCounters, + m_LocalContent, + m_LocalLookup, + CopyChunkDatas, + uint32_t(-1), + m_CacheMappingStats.LocalChunkMatchingRemoteCount, + m_CacheMappingStats.LocalChunkMatchingRemoteByteCount); - if (!ChunkTargetPtrs.empty()) - { - CopyChunkData::ChunkTarget Target = { - .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .RemoteChunkIndex = RemoteChunkIndex, - .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash); - CopySourceIt != RawHashToCopyChunkDataIndex.end()) - { - CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second]; - if (Data.TargetChunkLocationPtrs.size() > 1024) - { - RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); - CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, - .SourceSequenceIndex = LocalSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - else - { - Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), - ChunkTargetPtrs.begin(), - ChunkTargetPtrs.end()); - Data.ChunkTargets.push_back(Target); - } - } - else - { - RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); - CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, - .SourceSequenceIndex = LocalSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - m_CacheMappingStats.LocalChunkMatchingRemoteCount++; - m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize; - RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; - RemainingChunkCount--; - } - } - } - SourceOffset += LocalChunkRawSize; - } - } - } m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); } @@ -867,86 +806,22 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) ScavengedContentIndex++) { const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; - // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; - - for (uint32_t ScavengedSequenceIndex = 0; - ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); - ScavengedSequenceIndex++) - { - const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; - const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex]; - - { - uint64_t SourceOffset = 0; - const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex]; - for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++) - { - const uint32_t ScavengedChunkIndex = - ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex]; - const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex]; - const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; - - if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash); - RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = RemoteChunkIt->second; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - CopyChunkData::ChunkTarget Target = { - .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .RemoteChunkIndex = RemoteChunkIndex, - .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); - CopySourceIt != RawHashToCopyChunkDataIndex.end()) - { - CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second]; - if (Data.TargetChunkLocationPtrs.size() > 1024) - { - RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, - CopyChunkDatas.size()); - CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengedSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - else - { - Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), - ChunkTargetPtrs.begin(), - ChunkTargetPtrs.end()); - Data.ChunkTargets.push_back(Target); - } - } - else - { - RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size()); - CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengedSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++; - m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize; - RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; - RemainingChunkCount--; - } - } - } - SourceOffset += ScavengedChunkRawSize; - } - } - } + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + + ScavengeSourceForChunks(RemainingChunkCount, + RemoteChunkIndexNeedsCopyFromLocalFileFlags, + RawHashToCopyChunkDataIndex, + SequenceIndexChunksLeftToWriteCounters, + ScavengedContent, + ScavengedLookup, + CopyChunkDatas, + ScavengedContentIndex, + m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, + m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount); } m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); } + if (!m_Options.IsQuiet) { if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || @@ -2941,6 +2816,81 @@ BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source, return true; } +void +BuildsOperationUpdateFolder::ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount, + std::vector<bool>& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags, + tsl::robin_map<IoHash, size_t, IoHash::Hasher>& InOutRawHashToCopyChunkDataIndex, + const std::vector<std::atomic<uint32_t>>& SequenceIndexChunksLeftToWriteCounters, + const ChunkedFolderContent& ScavengedContent, + const ChunkedContentLookup& ScavengedLookup, + std::vector<CopyChunkData>& InOutCopyChunkDatas, + uint32_t ScavengedContentIndex, + uint64_t& InOutChunkMatchingRemoteCount, + uint64_t& InOutChunkMatchingRemoteByteCount) +{ + for (uint32_t RemoteChunkIndex = 0; + RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size() && (InOutRemainingChunkCount > 0); + RemoteChunkIndex++) + { + if (!InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + const IoHash& RemoteChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (auto It = ScavengedLookup.ChunkHashToChunkIndex.find(RemoteChunkHash); It != ScavengedLookup.ChunkHashToChunkIndex.end()) + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + + if (!ChunkTargetPtrs.empty()) + { + const uint32_t ScavengedChunkIndex = It->second; + const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; + const size_t ChunkSequenceLocationOffset = ScavengedLookup.ChunkSequenceLocationOffset[ScavengedChunkIndex]; + const ChunkedContentLookup::ChunkSequenceLocation& ScavengeLocation = + ScavengedLookup.ChunkSequenceLocations[ChunkSequenceLocationOffset]; + const IoHash& ScavengedSequenceRawHash = + ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengeLocation.SequenceIndex]; + + CopyChunkData::ChunkTarget Target = {.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), + .RemoteChunkIndex = RemoteChunkIndex, + .CacheFileOffset = ScavengeLocation.Offset}; + if (auto CopySourceIt = InOutRawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); + CopySourceIt != InOutRawHashToCopyChunkDataIndex.end()) + { + CopyChunkData& Data = InOutCopyChunkDatas[CopySourceIt->second]; + if (Data.TargetChunkLocationPtrs.size() > 1024) + { + InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); + InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengeLocation.SequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + else + { + Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), + ChunkTargetPtrs.begin(), + ChunkTargetPtrs.end()); + Data.ChunkTargets.push_back(Target); + } + } + else + { + InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); + InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengeLocation.SequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + InOutChunkMatchingRemoteCount++; + InOutChunkMatchingRemoteByteCount += ScavengedChunkRawSize; + InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; + InOutRemainingChunkCount--; + } + } + } + } +} + std::filesystem::path BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) { diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp index 15ece2edd..36b45e800 100644 --- a/src/zenremotestore/builds/buildstorageutil.cpp +++ b/src/zenremotestore/builds/buildstorageutil.cpp @@ -129,7 +129,10 @@ ResolveBuildStorage(OperationLogOutput& Output, TestJupiterEndpoint(ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2, ClientSettings.Verbose); TestResult.Success) { - ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", ServerEndpoint.BaseUrl); + if (Verbose) + { + ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", ServerEndpoint.BaseUrl); + } HostUrl = ServerEndpoint.BaseUrl; HostAssumeHttp2 = ServerEndpoint.AssumeHttp2; @@ -172,7 +175,10 @@ ResolveBuildStorage(OperationLogOutput& Output, TestZenCacheEndpoint(CacheEndpoint.BaseUrl, CacheEndpoint.AssumeHttp2, ClientSettings.Verbose); TestResult.Success) { - ZEN_OPERATION_LOG_INFO(Output, "Cache endpoint at '{}/status/builds' succeeded", CacheEndpoint.BaseUrl); + if (Verbose) + { + ZEN_OPERATION_LOG_INFO(Output, "Cache endpoint at '{}/status/builds' succeeded", CacheEndpoint.BaseUrl); + } CacheUrl = CacheEndpoint.BaseUrl; CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2; @@ -391,7 +397,10 @@ GetBlockDescriptions(OperationLogOutput& Output, [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); ListBlocksIt != FoundBlocks.end()) { - ZEN_OPERATION_LOG_INFO(Output, "Found block {} via context find successfully", BlockHash); + if (!IsQuiet) + { + ZEN_OPERATION_LOG_INFO(Output, "Found block {} via context find successfully", BlockHash); + } AugmentedBlockDescriptions.emplace_back(std::move(*ListBlocksIt)); } else diff --git a/src/zenremotestore/chunking/chunkedcontent.cpp b/src/zenremotestore/chunking/chunkedcontent.cpp index e8187d348..fda01aa56 100644 --- a/src/zenremotestore/chunking/chunkedcontent.cpp +++ b/src/zenremotestore/chunking/chunkedcontent.cpp @@ -108,7 +108,7 @@ namespace { uint32_t PathIndex, std::atomic<bool>& AbortFlag) { - ZEN_TRACE_CPU("ChunkFolderContent"); + ZEN_TRACE_CPU("HashOneFile"); const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex]; const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex]; diff --git a/src/zenremotestore/filesystemutils.cpp b/src/zenremotestore/filesystemutils.cpp index 8dff05c6b..fa1ce6f78 100644 --- a/src/zenremotestore/filesystemutils.cpp +++ b/src/zenremotestore/filesystemutils.cpp @@ -11,6 +11,11 @@ #include <zencore/timer.h> #include <zencore/trace.h> +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + namespace zen { BufferedOpenFile::BufferedOpenFile(const std::filesystem::path Path, @@ -349,13 +354,14 @@ CleanDirectory( std::atomic<uint64_t> DeletedItemCount = 0; std::atomic<uint64_t> DeletedByteCount = 0; - CleanDirectoryResult Result; - RwLock ResultLock; - auto _ = MakeGuard([&]() { - Result.DeletedCount = DeletedItemCount.load(); - Result.DeletedByteCount = DeletedByteCount.load(); - Result.FoundCount = DiscoveredItemCount.load(); - }); + std::vector<std::filesystem::path> DirectoriesToDelete; + CleanDirectoryResult Result; + RwLock ResultLock; + auto _ = MakeGuard([&]() { + Result.DeletedCount = DeletedItemCount.load(); + Result.DeletedByteCount = DeletedByteCount.load(); + Result.FoundCount = DiscoveredItemCount.load(); + }); ParallelWork Work(AbortFlag, PauseFlag, @@ -363,119 +369,133 @@ CleanDirectory( struct AsyncVisitor : public GetDirectoryContentVisitor { - AsyncVisitor(const std::filesystem::path& InPath, - std::atomic<bool>& InAbortFlag, - std::atomic<uint64_t>& InDiscoveredItemCount, - std::atomic<uint64_t>& InDeletedItemCount, - std::atomic<uint64_t>& InDeletedByteCount, - std::span<const std::string> InExcludeDirectories, - CleanDirectoryResult& InResult, - RwLock& InResultLock) + AsyncVisitor(const std::filesystem::path& InPath, + std::atomic<bool>& InAbortFlag, + std::atomic<uint64_t>& InDiscoveredItemCount, + std::atomic<uint64_t>& InDeletedItemCount, + std::atomic<uint64_t>& InDeletedByteCount, + std::span<const std::string> InExcludeDirectories, + std::vector<std::filesystem::path>& OutDirectoriesToDelete, + CleanDirectoryResult& InResult, + RwLock& InResultLock) : Path(InPath) , AbortFlag(InAbortFlag) , DiscoveredItemCount(InDiscoveredItemCount) , DeletedItemCount(InDeletedItemCount) , DeletedByteCount(InDeletedByteCount) , ExcludeDirectories(InExcludeDirectories) + , DirectoriesToDelete(OutDirectoriesToDelete) , Result(InResult) , ResultLock(InResultLock) { } + + virtual bool AsyncAllowDirectory(const std::filesystem::path& Parent, const std::filesystem::path& DirectoryName) const override + { + ZEN_UNUSED(Parent); + + if (AbortFlag) + { + return false; + } + const std::string DirectoryString = DirectoryName.string(); + for (const std::string_view ExcludeDirectory : ExcludeDirectories) + { + if (DirectoryString == ExcludeDirectory) + { + return false; + } + } + return true; + } + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override { ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory"); if (!AbortFlag) { - if (!Content.FileNames.empty()) + DiscoveredItemCount += Content.FileNames.size(); + + ZEN_TRACE_CPU("DeleteFiles"); + std::vector<std::pair<std::filesystem::path, std::error_code>> FailedRemovePaths; + for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++) { - DiscoveredItemCount += Content.FileNames.size(); + const std::filesystem::path& FileName = Content.FileNames[FileIndex]; + const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred(); - const std::string RelativeRootString = RelativeRoot.generic_string(); - bool RemoveContent = true; - for (const std::string_view ExcludeDirectory : ExcludeDirectories) + bool IsRemoved = false; + std::error_code Ec; + (void)SetFileReadOnly(FilePath, false, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) { - if (RelativeRootString.starts_with(ExcludeDirectory)) + if (!IsFileWithRetry(FilePath)) { - if (RelativeRootString.length() > ExcludeDirectory.length()) - { - const char MaybePathDelimiter = RelativeRootString[ExcludeDirectory.length()]; - if (MaybePathDelimiter == '/' || MaybePathDelimiter == '\\' || - MaybePathDelimiter == std::filesystem::path::preferred_separator) - { - RemoveContent = false; - break; - } - } - else - { - RemoveContent = false; - break; - } + IsRemoved = true; + Ec.clear(); + break; } + Sleep(100 + int(Retries * 50)); + Ec.clear(); + (void)SetFileReadOnly(FilePath, false, Ec); } - if (RemoveContent) + if (!IsRemoved && !Ec) { - ZEN_TRACE_CPU("DeleteFiles"); - for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++) + (void)RemoveFile(FilePath, Ec); + for (size_t Retries = 0; Ec && Retries < 6; Retries++) { - const std::filesystem::path& FileName = Content.FileNames[FileIndex]; - const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred(); - - bool IsRemoved = false; - std::error_code Ec; - (void)SetFileReadOnly(FilePath, false, Ec); - for (size_t Retries = 0; Ec && Retries < 3; Retries++) + if (!IsFileWithRetry(FilePath)) { - if (!IsFileWithRetry(FilePath)) - { - IsRemoved = true; - Ec.clear(); - break; - } - Sleep(100 + int(Retries * 50)); + IsRemoved = true; Ec.clear(); - (void)SetFileReadOnly(FilePath, false, Ec); - } - if (!IsRemoved && !Ec) - { - (void)RemoveFile(FilePath, Ec); - for (size_t Retries = 0; Ec && Retries < 6; Retries++) - { - if (!IsFileWithRetry(FilePath)) - { - IsRemoved = true; - Ec.clear(); - return; - } - Sleep(100 + int(Retries * 50)); - Ec.clear(); - (void)RemoveFile(FilePath, Ec); - } - } - if (!IsRemoved && Ec) - { - RwLock::ExclusiveLockScope _(ResultLock); - Result.FailedRemovePaths.push_back(std::make_pair(FilePath, Ec)); - } - else - { - DeletedItemCount++; - DeletedByteCount += Content.FileSizes[FileIndex]; + break; } + Sleep(100 + int(Retries * 50)); + Ec.clear(); + (void)RemoveFile(FilePath, Ec); } } + if (!IsRemoved && Ec) + { + FailedRemovePaths.push_back(std::make_pair(FilePath, Ec)); + } + else + { + DeletedItemCount++; + DeletedByteCount += Content.FileSizes[FileIndex]; + } + } + + if (!FailedRemovePaths.empty()) + { + RwLock::ExclusiveLockScope _(ResultLock); + FailedRemovePaths.insert(FailedRemovePaths.end(), FailedRemovePaths.begin(), FailedRemovePaths.end()); + } + else if (!RelativeRoot.empty()) + { + DiscoveredItemCount++; + RwLock::ExclusiveLockScope _(ResultLock); + DirectoriesToDelete.push_back(RelativeRoot); } } } - const std::filesystem::path& Path; - std::atomic<bool>& AbortFlag; - std::atomic<uint64_t>& DiscoveredItemCount; - std::atomic<uint64_t>& DeletedItemCount; - std::atomic<uint64_t>& DeletedByteCount; - std::span<const std::string> ExcludeDirectories; - CleanDirectoryResult& Result; - RwLock& ResultLock; - } Visitor(Path, AbortFlag, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories, Result, ResultLock); + const std::filesystem::path& Path; + std::atomic<bool>& AbortFlag; + std::atomic<uint64_t>& DiscoveredItemCount; + std::atomic<uint64_t>& DeletedItemCount; + std::atomic<uint64_t>& DeletedByteCount; + std::span<const std::string> ExcludeDirectories; + std::vector<std::filesystem::path>& DirectoriesToDelete; + CleanDirectoryResult& Result; + RwLock& ResultLock; + } Visitor(Path, + AbortFlag, + DiscoveredItemCount, + DeletedItemCount, + DeletedByteCount, + ExcludeDirectories, + DirectoriesToDelete, + Result, + ResultLock); GetDirectoryContent(Path, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes, @@ -483,29 +503,6 @@ CleanDirectory( IOWorkerPool, Work.PendingWork()); - DirectoryContent LocalDirectoryContent; - GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent); - DiscoveredItemCount += LocalDirectoryContent.Directories.size(); - std::vector<std::filesystem::path> DirectoriesToDelete; - DirectoriesToDelete.reserve(LocalDirectoryContent.Directories.size()); - for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) - { - bool Leave = false; - for (const std::string_view ExcludeDirectory : ExcludeDirectories) - { - if (LocalDirPath == (Path / ExcludeDirectory)) - { - Leave = true; - break; - } - } - if (!Leave) - { - DirectoriesToDelete.emplace_back(std::move(LocalDirPath)); - DiscoveredItemCount++; - } - } - uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs(); if (ProgressFunc && ProgressUpdateDelayMS != 0) @@ -528,6 +525,15 @@ CleanDirectory( { ZEN_TRACE_CPU("DeleteDirs"); + + std::sort(DirectoriesToDelete.begin(), + DirectoriesToDelete.end(), + [](const std::filesystem::path& Lhs, const std::filesystem::path& Rhs) { + auto DistanceLhs = std::distance(Lhs.begin(), Lhs.end()); + auto DistanceRhs = std::distance(Rhs.begin(), Rhs.end()); + return DistanceLhs > DistanceRhs; + }); + for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete) { if (AbortFlag) @@ -542,53 +548,48 @@ CleanDirectory( } } - { - std::error_code Ec; - zen::CleanDirectory(DirectoryToDelete, /*ForceRemoveReadOnlyFiles*/ true, Ec); - if (Ec) - { - Sleep(200); - Ec.clear(); - zen::CleanDirectory(DirectoryToDelete, /*ForceRemoveReadOnlyFiles*/ true, Ec); - } + const std::filesystem::path FullPath = Path / DirectoryToDelete; - if (!Ec) + std::error_code Ec; + RemoveDir(FullPath, Ec); + if (Ec) + { + for (size_t Retries = 0; Ec && Retries < 3; Retries++) { - RemoveDir(DirectoryToDelete, Ec); - for (size_t Retries = 0; Ec && Retries < 3; Retries++) + if (!IsDir(FullPath)) { - if (!IsDir(DirectoryToDelete)) - { - Ec.clear(); - break; - } - Sleep(100 + int(Retries * 50)); Ec.clear(); - RemoveDir(DirectoryToDelete, Ec); + break; } - } - if (Ec) - { - RwLock::ExclusiveLockScope __(ResultLock); - Result.FailedRemovePaths.push_back(std::make_pair(DirectoryToDelete, Ec)); - } - else - { - DeletedItemCount++; + Sleep(100 + int(Retries * 50)); + Ec.clear(); + RemoveDir(FullPath, Ec); } } + if (Ec) + { + RwLock::ExclusiveLockScope __(ResultLock); + Result.FailedRemovePaths.push_back(std::make_pair(DirectoryToDelete, Ec)); + } + else + { + DeletedItemCount++; + } - uint64_t NowMs = Timer.GetElapsedTimeMs(); - - if ((NowMs - LastUpdateTimeMs) >= ProgressUpdateDelayMS) + if (ProgressFunc) { - LastUpdateTimeMs = NowMs; + uint64_t NowMs = Timer.GetElapsedTimeMs(); + + if ((NowMs - LastUpdateTimeMs) > 0) + { + LastUpdateTimeMs = NowMs; - uint64_t Deleted = DeletedItemCount.load(); - uint64_t DeletedBytes = DeletedByteCount.load(); - uint64_t Discovered = DiscoveredItemCount.load(); - std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)); - ProgressFunc(Details, Discovered, Discovered - Deleted, PauseFlag, AbortFlag); + uint64_t Deleted = DeletedItemCount.load(); + uint64_t DeletedBytes = DeletedByteCount.load(); + uint64_t Discovered = DiscoveredItemCount.load(); + std::string Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)); + ProgressFunc(Details, Discovered, Discovered - Deleted, PauseFlag, AbortFlag); + } } } } @@ -625,4 +626,72 @@ CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool, return false; } +#if ZEN_WITH_TESTS + +void +filesystemutils_forcelink() +{ +} + +namespace { + void GenerateFile(const std::filesystem::path& Path) { BasicFile _(Path, BasicFile::Mode::kTruncate); } +} // namespace + +TEST_CASE("filesystemutils.CleanDirectory") +{ + ScopedTemporaryDirectory TmpDir; + + CreateDirectories(TmpDir.Path() / ".keepme"); + GenerateFile(TmpDir.Path() / ".keepme" / "keep"); + GenerateFile(TmpDir.Path() / "deleteme1"); + GenerateFile(TmpDir.Path() / "deleteme2"); + GenerateFile(TmpDir.Path() / "deleteme3"); + CreateDirectories(TmpDir.Path() / ".keepmenot"); + CreateDirectories(TmpDir.Path() / "no.keepme"); + + CreateDirectories(TmpDir.Path() / "DeleteMe"); + GenerateFile(TmpDir.Path() / "DeleteMe" / "delete1"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete1"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "delete3"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe" / ".keepme"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe" / "DeleteMe2" / "delete3"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme"); + CreateDirectories(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept"); + GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1"); + GenerateFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2"); + GenerateFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme"); + + WorkerThreadPool Pool(4); + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + + CleanDirectory(Pool, AbortFlag, PauseFlag, TmpDir.Path(), std::vector<std::string>{".keepme"}, {}, 0); + + CHECK(IsDir(TmpDir.Path() / ".keepme")); + CHECK(IsFile(TmpDir.Path() / ".keepme" / "keep")); + CHECK(!IsFile(TmpDir.Path() / "deleteme1")); + CHECK(!IsFile(TmpDir.Path() / "deleteme2")); + CHECK(!IsFile(TmpDir.Path() / "deleteme3")); + CHECK(!IsFile(TmpDir.Path() / ".keepmenot")); + CHECK(!IsFile(TmpDir.Path() / "no.keepme")); + + CHECK(!IsDir(TmpDir.Path() / "DeleteMe")); + CHECK(!IsDir(TmpDir.Path() / "DeleteMe2")); + + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe" / ".keepme")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme")); + CHECK(IsDir(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept")); + CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept1")); + CHECK(IsFile(TmpDir.Path() / "CantDeleteMe2" / ".keepme" / "Kept" / "kept2")); + CHECK(!IsFile(TmpDir.Path() / "CantDeleteMe2" / "deleteme")); +} + +#endif + } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index d78ee29c1..32c8bda01 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -141,7 +141,6 @@ public: bool EnableTargetFolderScavenging = true; bool ValidateCompletedSequences = true; std::vector<std::string> ExcludeFolders; - std::vector<std::string> ExcludeExtensions; uint64_t MaximumInMemoryPayloadSize = 512u * 1024u; bool PopulateCache = true; }; @@ -257,6 +256,17 @@ private: ChunkedFolderContent& OutScavengedLocalContent, ChunkedContentLookup& OutScavengedLookup); + void ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount, + std::vector<bool>& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags, + tsl::robin_map<IoHash, size_t, IoHash::Hasher>& InOutRawHashToCopyChunkDataIndex, + const std::vector<std::atomic<uint32_t>>& SequenceIndexChunksLeftToWriteCounters, + const ChunkedFolderContent& ScavengedContent, + const ChunkedContentLookup& ScavengedLookup, + std::vector<CopyChunkData>& InOutCopyChunkDatas, + uint32_t ScavengedContentIndex, + uint64_t& InOutChunkMatchingRemoteCount, + uint64_t& InOutChunkMatchingRemoteByteCount); + std::filesystem::path FindDownloadedChunk(const IoHash& ChunkHash); std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets( diff --git a/src/zenremotestore/include/zenremotestore/filesystemutils.h b/src/zenremotestore/include/zenremotestore/filesystemutils.h index a6c88e5cb..cfd6f02e1 100644 --- a/src/zenremotestore/include/zenremotestore/filesystemutils.h +++ b/src/zenremotestore/include/zenremotestore/filesystemutils.h @@ -116,4 +116,6 @@ bool CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool, std::atomic<bool>& PauseFlag, const std::filesystem::path& Directory); +void filesystemutils_forcelink(); // internal + } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h index 044436509..a07ede6f6 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h @@ -56,7 +56,7 @@ private: const Oid m_BuildId; const Options m_Options; - Oid m_BuildPartId; + Oid m_BuildPartId = Oid::Zero; CbObject m_BuildObject; CbObject m_BuildPartsObject; CbObject m_OpsSectionObject; diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index b566e5bed..8be8eb0df 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -11,6 +11,7 @@ #include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/timer.h> +#include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> #include <zenremotestore/chunking/chunkedfile.h> @@ -266,6 +267,8 @@ namespace remotestore_impl { &DownloadStartMS, IgnoreMissingAttachments, OptionalContext]() { + ZEN_TRACE_CPU("DownloadBlockChunks"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -386,7 +389,9 @@ namespace remotestore_impl { IgnoreMissingAttachments, OptionalContext, RetriesLeft, - Chunks = Chunks]() { + Chunks = std::vector<IoHash>(Chunks)]() { + ZEN_TRACE_CPU("DownloadBlock"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -439,7 +444,7 @@ namespace remotestore_impl { IgnoreMissingAttachments, OptionalContext, RetriesLeft, - Chunks = Chunks, + Chunks = std::move(Chunks), Bytes = std::move(BlockResult.Bytes)]() { auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) @@ -492,7 +497,7 @@ namespace remotestore_impl { {}); return; } - SharedBuffer BlockPayload = Compressed.Decompress(); + CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); if (!BlockPayload) { if (RetriesLeft > 0) @@ -542,7 +547,7 @@ namespace remotestore_impl { uint64_t BlockHeaderSize = 0; bool StoreChunksOK = IterateChunkBlock( - BlockPayload, + BlockPayload.Flatten(), [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize]( CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { @@ -648,6 +653,8 @@ namespace remotestore_impl { &Info, IgnoreMissingAttachments, OptionalContext]() { + ZEN_TRACE_CPU("DownloadAttachment"); + auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -694,6 +701,8 @@ namespace remotestore_impl { AttachmentSize, Bytes = std::move(AttachmentResult.Bytes), OptionalContext]() { + ZEN_TRACE_CPU("WriteAttachment"); + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -745,6 +754,8 @@ namespace remotestore_impl { Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { + ZEN_TRACE_CPU("CreateBlock"); + auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -917,6 +928,8 @@ namespace remotestore_impl { &LooseFileAttachments, &Info, OptionalContext]() { + ZEN_TRACE_CPU("UploadAttachment"); + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1039,6 +1052,8 @@ namespace remotestore_impl { &BulkBlockAttachmentsToUpload, &Info, OptionalContext]() { + ZEN_TRACE_CPU("UploadChunk"); + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -1587,6 +1602,8 @@ BuildContainer(CidStore& ChunkStore, AllowChunking, &RemoteResult, OptionalContext]() { + ZEN_TRACE_CPU("PrepareChunk"); + auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -1972,10 +1989,16 @@ BuildContainer(CidStore& ChunkStore, try { uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); - std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; + std::unordered_set<IoHash, IoHash::Hasher> AddedAttachmentHashes; auto NewBlock = [&]() { - size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); - size_t ChunkCount = ChunksInBlock.size(); + size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); + size_t ChunkCount = ChunksInBlock.size(); + std::vector<IoHash> ChunkRawHashes; + ChunkRawHashes.reserve(ChunkCount); + for (const std::pair<IoHash, FetchChunkFunc>& Chunk : ChunksInBlock) + { + ChunkRawHashes.push_back(Chunk.first); + } if (BuildBlocks) { remotestore_impl::CreateBlock(WorkerPool, @@ -1990,15 +2013,13 @@ BuildContainer(CidStore& ChunkStore, } else { - ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); + ZEN_INFO("Bulk group {} attachments", ChunkCount); OnBlockChunks(std::move(ChunksInBlock)); } { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunkRawHashes.insert(Blocks[BlockIndex].ChunkRawHashes.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); + Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); } uint64_t NowMS = Timer.GetElapsedTimeMs(); ZEN_INFO("Assembled block {} with {} chunks in {} ({})", @@ -2007,7 +2028,6 @@ BuildContainer(CidStore& ChunkStore, NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), NiceBytes(BlockSize)); FetchAttachmentsStartMS = NowMS; - BlockAttachmentHashes.clear(); ChunksInBlock.clear(); BlockSize = 0; GeneratedBlockCount++; @@ -2039,8 +2059,17 @@ BuildContainer(CidStore& ChunkStore, ZEN_ASSERT(InfoIt != UploadAttachments.end()); uint64_t PayloadSize = InfoIt->second.Size; - if (BlockAttachmentHashes.insert(AttachmentHash).second) + if (AddedAttachmentHashes.insert(AttachmentHash).second) { + if (BuildBlocks && ChunksInBlock.size() > 0) + { + if (((BlockSize + PayloadSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) && + (CurrentOpKey != LastOpKey)) + { + NewBlock(); + } + } + if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) { ChunksInBlock.emplace_back(std::make_pair( @@ -2079,10 +2108,6 @@ BuildContainer(CidStore& ChunkStore, } BlockSize += PayloadSize; - if ((BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) && (CurrentOpKey != LastOpKey)) - { - NewBlock(); - } LastOpKey = CurrentOpKey; ChunksAssembled++; } @@ -2123,9 +2148,17 @@ BuildContainer(CidStore& ChunkStore, const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex]; if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end()) { - if (BlockAttachmentHashes.insert(ChunkHash).second) + if (AddedAttachmentHashes.insert(ChunkHash).second) { const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; + uint32_t ChunkSize = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size); + if (BuildBlocks && ChunksInBlock.size() > 0) + { + if ((BlockSize + ChunkSize) > MaxBlockSize || (ChunksInBlock.size() + 1) > MaxChunksPerBlock) + { + NewBlock(); + } + } ChunksInBlock.emplace_back( std::make_pair(ChunkHash, [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( @@ -2136,13 +2169,6 @@ BuildContainer(CidStore& ChunkStore, OodleCompressionLevel::None)}; })); BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; - if (BuildBlocks) - { - if (BlockSize >= MaxBlockSize || ChunksInBlock.size() > MaxChunksPerBlock) - { - NewBlock(); - } - } ChunksAssembled++; } ChunkedHashes.erase(FindIt); @@ -2781,12 +2807,26 @@ ParseOplogContainer(const CbObject& ContainerObject, for (CbFieldView OpEntry : OpsArray) { OpEntry.IterateAttachments([&](CbFieldView FieldView) { OpsAttachments.insert(FieldView.AsAttachment()); }); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = "Operation cancelled"}; + } } } { std::vector<IoHash> ReferencedAttachments(OpsAttachments.begin(), OpsAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } + + if (remotestore_impl::IsCancelled(OptionalContext)) + { + return RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::OK), + .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, + .Reason = "Operation cancelled"}; + } + remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", OpsAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); @@ -3206,6 +3246,8 @@ LoadOplog(CidStore& ChunkStore, IgnoreMissingAttachments, &Info, OptionalContext]() { + ZEN_TRACE_CPU("DechunkAttachment"); + auto _ = MakeGuard([&DechunkLatch, &TempFileName] { std::error_code Ec; if (IsFile(TempFileName, Ec)) @@ -3232,7 +3274,7 @@ LoadOplog(CidStore& ChunkStore, { BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); BLAKE3Stream HashingStream; for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { @@ -3255,15 +3297,80 @@ LoadOplog(CidStore& ChunkStore, } return; } - CompositeBuffer Decompressed = - CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); - for (const SharedBuffer& Segment : Decompressed.GetSegments()) + + IoHash RawHash; + uint64_t RawSize; + + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); + if (RawHash != ChunkHash) { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); - Offset += SegmentData.GetSize(); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", + RawHash, + ChunkHash, + Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", + RawHash, + ChunkHash, + Chunked.RawHash)); + } + return; + } + + { + ZEN_TRACE_CPU("DecompressChunk"); + + if (!Compressed.DecompressToStream(0, + RawSize, + [&](uint64_t SourceOffset, + uint64_t SourceSize, + uint64_t Offset, + const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + + for (const SharedBuffer& Segment : + RangeBuffer.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), + SegmentData.GetSize(), + ChunkOffset + Offset); + } + return true; + })) + { + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Failed to decompress chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Failed to decompress chunk {} for chunked attachment {}", + ChunkHash, + Chunked.RawHash)); + } + return; + } } + ChunkOffset += RawSize; } BLAKE3 RawHash = HashingStream.GetHash(); ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); diff --git a/src/zenremotestore/zenremotestore.cpp b/src/zenremotestore/zenremotestore.cpp index e074455b3..7f785599f 100644 --- a/src/zenremotestore/zenremotestore.cpp +++ b/src/zenremotestore/zenremotestore.cpp @@ -5,6 +5,7 @@ #include <zenremotestore/builds/buildsavedstate.h> #include <zenremotestore/chunking/chunkedcontent.h> #include <zenremotestore/chunking/chunkedfile.h> +#include <zenremotestore/filesystemutils.h> #include <zenremotestore/projectstore/remoteprojectstore.h> #if ZEN_WITH_TESTS @@ -19,6 +20,7 @@ zenremotestore_forcelinktests() chunkedcontent_forcelink(); chunkedfile_forcelink(); chunkedcontent_forcelink(); + filesystemutils_forcelink(); remoteprojectstore_forcelink(); } diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 42296cbe1..418fc7978 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -61,6 +61,10 @@ zen::ZenServerEnvironment TestEnv; int main(int argc, char** argv) { +# if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +# endif // ZEN_PLATFORM_WINDOWS + using namespace std::literals; using namespace zen; diff --git a/src/zenserver/diag/diagsvcs.cpp b/src/zenserver/diag/diagsvcs.cpp index 8abf6e8a3..d8d53b0e3 100644 --- a/src/zenserver/diag/diagsvcs.cpp +++ b/src/zenserver/diag/diagsvcs.cpp @@ -28,30 +28,6 @@ GetHealthTag() using namespace std::literals; -static bool -ReadLogFile(const std::string& Path, StringBuilderBase& Out) -{ - try - { - constexpr auto ReadSize = std::size_t{4096}; - auto FileStream = std::ifstream{Path}; - - std::string Buf(ReadSize, '\0'); - while (FileStream.read(&Buf[0], ReadSize)) - { - Out.Append(std::string_view(&Buf[0], FileStream.gcount())); - } - Out.Append(std::string_view(&Buf[0], FileStream.gcount())); - - return true; - } - catch (const std::exception&) - { - Out.Reset(); - return false; - } -} - HttpHealthService::HttpHealthService() { ZEN_MEMSCOPE(GetHealthTag()); @@ -95,10 +71,9 @@ HttpHealthService::HttpHealthService() return m_HealthInfo.AbsLogPath.empty() ? m_HealthInfo.DataRoot / "logs/zenserver.log" : m_HealthInfo.AbsLogPath; }(); - ExtendableStringBuilder<4096> Sb; - if (ReadLogFile(Path.string(), Sb) && Sb.Size() > 0) + if (IoBuffer LogBuffer = IoBufferBuilder::MakeFromFile(Path)) { - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, Sb.ToView()); + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, LogBuffer); } else { diff --git a/src/zenserver/frontend/html.zip b/src/zenserver/frontend/html.zip Binary files differindex f9fc8a8ef..36f08a05d 100644 --- a/src/zenserver/frontend/html.zip +++ b/src/zenserver/frontend/html.zip diff --git a/src/zenserver/frontend/html/util/component.js b/src/zenserver/frontend/html/util/component.js index 205aa038e..3c4780d77 100644 --- a/src/zenserver/frontend/html/util/component.js +++ b/src/zenserver/frontend/html/util/component.js @@ -72,7 +72,7 @@ class ComponentDom extends ComponentBase text(value) { value = (value == undefined) ? "undefined" : value.toString(); - this._element.innerHTML = (value != "") ? value : ""; + this._element.textContent = (value != "") ? value : ""; return this; } diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 34848c831..da256d06d 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -206,6 +206,10 @@ AppMain(int argc, char* argv[]) int test_main(int argc, char** argv) { +# if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +# endif // ZEN_PLATFORM_WINDOWS + zen::logging::InitializeLogging(); zen::logging::SetLogLevel(zen::logging::level::Debug); @@ -218,6 +222,10 @@ test_main(int argc, char** argv) int main(int argc, char* argv[]) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + using namespace zen; if (argc >= 2) diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index ab8dbb16b..95ea114bb 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -122,7 +122,9 @@ ZenServerBase::Initialize(const ZenServerConfig& ServerOptions, ZenServerState:: if (m_ServerMutex.Create(MutexName) == false) { - ThrowLastError(fmt::format("Failed to create mutex '{}'", MutexName).c_str()); + std::error_code Ec = MakeErrorCodeFromLastError(); + ZEN_WARN("Failed to create server mutex '{}'. Reason: '{}' ({})", MutexName, Ec.message(), Ec.value()); + return -1; } EnqueueSigIntTimer(); diff --git a/src/zenstore-test/zenstore-test.cpp b/src/zenstore-test/zenstore-test.cpp index 6df7162fd..c055dbb64 100644 --- a/src/zenstore-test/zenstore-test.cpp +++ b/src/zenstore-test/zenstore-test.cpp @@ -16,6 +16,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + #if ZEN_WITH_TESTS zen::zenstore_forcelinktests(); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index f97c98e08..0542d1171 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -762,7 +762,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con LargestSize = Max(LargestSize, Size); } - const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u); + const uint64_t MinSize = Max(LargestSize, 512u * 1024u); const uint64_t BufferSize = Min(TotalSize, MinSize); std::vector<uint8_t> Buffer(BufferSize); @@ -815,7 +815,12 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); }); + if (Count > 1) { + if (Buffer.empty()) + { + Buffer.resize(BufferSize); + } MutableMemoryView WriteBuffer(Buffer.data(), RangeSize); for (size_t Index = 0; Index < Count; Index++) { @@ -824,9 +829,14 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment)); } WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset); + m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); + } + else + { + MemoryView SourceBuffer = Datas[Offset]; + WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset); + m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed); } - - m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed); uint32_t ChunkOffset = AlignedInsertOffset; std::vector<BlockStoreLocation> Locations(Count); @@ -845,11 +855,11 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con bool BlockStore::HasChunk(const BlockStoreLocation& Location) const { - ZEN_TRACE_CPU("BlockStore::TryGetChunk"); + ZEN_TRACE_CPU("BlockStore::HasChunk"); RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { - if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + if (Ref<BlockStoreFile> Block = BlockIt->second; Block) { InsertLock.ReleaseNow(); @@ -878,8 +888,10 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const RwLock::SharedLockScope InsertLock(m_InsertLock); if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) { - if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + if (Ref<BlockStoreFile> Block = BlockIt->second; Block) { + InsertLock.ReleaseNow(); + IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size); if (Chunk.GetSize() == Location.Size) { diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index a5de5c448..37a8c36b8 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -301,13 +301,14 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("CasContainer::FindChunk"); - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope Lock(m_LocationMapLock); auto KeyIt = m_LocationMap.find(ChunkHash); if (KeyIt == m_LocationMap.end()) { return IoBuffer(); } - const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + Lock.ReleaseNow(); IoBuffer Chunk = m_BlockStore.TryGetChunk(Location); return Chunk; @@ -316,10 +317,11 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope Lock(m_LocationMapLock); if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { - const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment); + Lock.ReleaseNow(); return m_BlockStore.HasChunk(Location); } return false; diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp index f1001f665..c5b27c1ea 100644 --- a/src/zenstore/projectstore.cpp +++ b/src/zenstore/projectstore.cpp @@ -3917,7 +3917,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); - ZEN_INFO("scrubbing '{}'", ProjectRootDir); + ZEN_INFO("scrubbing '{}'", m_OplogStoragePath); // Scrubbing needs to check all existing oplogs std::vector<std::string> OpLogs = ScanForOplogs(); diff --git a/src/zentelemetry-test/zentelemetry-test.cpp b/src/zentelemetry-test/zentelemetry-test.cpp index c8b067226..83fd549db 100644 --- a/src/zentelemetry-test/zentelemetry-test.cpp +++ b/src/zentelemetry-test/zentelemetry-test.cpp @@ -16,6 +16,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + #if ZEN_WITH_TESTS zen::zentelemetry_forcelinktests(); diff --git a/src/zenutil-test/zenutil-test.cpp b/src/zenutil-test/zenutil-test.cpp index 3e3a11a01..f5cfd5a72 100644 --- a/src/zenutil-test/zenutil-test.cpp +++ b/src/zenutil-test/zenutil-test.cpp @@ -16,6 +16,10 @@ int main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { +#if ZEN_PLATFORM_WINDOWS + setlocale(LC_ALL, "en_us.UTF8"); +#endif // ZEN_PLATFORM_WINDOWS + #if ZEN_WITH_TESTS zen::zenutil_forcelinktests(); diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 4d10f3794..8901b7779 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -11,6 +11,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <spdlog/sinks/sink.h> ZEN_THIRD_PARTY_INCLUDES_END +#include <atomic> #include <filesystem> namespace zen::logging { @@ -248,7 +249,7 @@ private: const std::size_t m_MaxSize; const std::size_t m_MaxFiles; BasicFile m_CurrentFile; - bool m_NeedFlush = false; + std::atomic<bool> m_NeedFlush = false; }; } // namespace zen::logging |