diff options
| -rw-r--r-- | scripts/remote_build.py | 3 | ||||
| -rw-r--r-- | zencore/filesystem.cpp | 27 | ||||
| -rw-r--r-- | zencore/include/zencore/filesystem.h | 4 | ||||
| -rw-r--r-- | zencore/include/zencore/string.h | 4 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 5 | ||||
| -rw-r--r-- | zenstore-test/zenstore-test.cpp | 8 | ||||
| -rw-r--r-- | zenstore/basicfile.cpp | 8 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 169 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 5 |
9 files changed, 137 insertions, 96 deletions
diff --git a/scripts/remote_build.py b/scripts/remote_build.py index 70f9cf9bf..d814d4a66 100644 --- a/scripts/remote_build.py +++ b/scripts/remote_build.py @@ -108,13 +108,14 @@ def _local(args): # Validate key file. Git's SSH uses OpenSSL which needs UNIX line-endings if args.keyfile: + """ with open(args.keyfile, "rt") as key_file: lines = [x.strip() for x in key_file] with open(args.keyfile, "wb") as key_file: for line in lines: key_file.write(line.encode() + b"\n") - + """ identity = ("-i", args.keyfile) else: identity = () diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index e2778089b..7ff0dc45c 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -23,6 +23,7 @@ #if ZEN_PLATFORM_LINUX # include <dirent.h> # include <fcntl.h> +# include <sys/resource.h> # include <sys/stat.h> # include <unistd.h> #endif @@ -31,6 +32,7 @@ # include <dirent.h> # include <fcntl.h> # include <libproc.h> +# include <sys/resource.h> # include <sys/stat.h> # include <sys/syslimits.h> # include <unistd.h> @@ -986,6 +988,31 @@ GetRunningExecutablePath() #endif // ZEN_PLATFORM_WINDOWS } +void +MaximizeOpenFileCount() +{ +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + struct rlimit Limit; + int Error = getrlimit(RLIMIT_NOFILE, &Limit); + if (Error) + { + ZEN_WARN("failed getting rlimit RLIMIT_NOFILE, reason '{}'", zen::MakeErrorCode(Error).message()); + } + else + { + struct rlimit NewLimit = Limit; + NewLimit.rlim_cur = NewLimit.rlim_max; + ZEN_INFO("changing RLIMIT_NOFILE from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max); + + Error = setrlimit(RLIMIT_NOFILE, &NewLimit); + if (Error != 0) + { + ZEN_WARN("failed to set RLIMIT_NOFILE limits from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}, reason '{}'", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max, zen::MakeErrorCode(Error).message()); + } + } +#endif +} + ////////////////////////////////////////////////////////////////////////// // // Testing related code follows... diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h index d1a5f3e0c..a6e76eaa0 100644 --- a/zencore/include/zencore/filesystem.h +++ b/zencore/include/zencore/filesystem.h @@ -34,6 +34,10 @@ ZENCORE_API std::filesystem::path PathFromHandle(void* NativeHandle); ZENCORE_API std::filesystem::path GetRunningExecutablePath(); +/** Set the max open file handle count to max allowed for the current process on Linux and MacOS + */ +ZENCORE_API void MaximizeOpenFileCount(); + struct FileContents { std::vector<IoBuffer> Data; diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index e502120ac..027730063 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -597,7 +597,7 @@ ToHexNumber(const uint8_t* InputData, size_t ByteCount, char* OutString) /// <param name="Value">Integer value type</param> /// <param name="outString">Output buffer where resulting string is written</param> void -ToHexNumber(std::unsigned_integral auto Value, char* OutString) +ToHexNumber(UnsignedIntegral auto Value, char* OutString) { ToHexNumber((const uint8_t*)&Value, sizeof(Value), OutString); OutString[sizeof(Value) * 2] = 0; @@ -611,7 +611,7 @@ ToHexNumber(std::unsigned_integral auto Value, char* OutString) /// <param name="OutValue">Pointer to output value</param> /// <returns>true if the input consisted of all valid hexadecimal characters</returns> bool -ParseHexNumber(const std::string HexString, std::unsigned_integral auto& OutValue) +ParseHexNumber(const std::string HexString, UnsignedIntegral auto& OutValue) { return ParseHexNumber(HexString.c_str(), sizeof(OutValue) * 2, (uint8_t*)&OutValue); } diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index f81deb167..3ac5b9992 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -1018,6 +1018,8 @@ ZenEntryPoint::Run() InitializeLogging(ServerOptions); + MaximizeOpenFileCount(); + ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath); ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL); @@ -1156,9 +1158,10 @@ test_main(int argc, char** argv) zen::z$_forcelink(); zen::logging::InitializeLogging(); - spdlog::set_level(spdlog::level::debug); + zen::MaximizeOpenFileCount(); + return doctest::Context(argc, argv).run(); } #endif diff --git a/zenstore-test/zenstore-test.cpp b/zenstore-test/zenstore-test.cpp index e6bd92ab9..030b1159d 100644 --- a/zenstore-test/zenstore-test.cpp +++ b/zenstore-test/zenstore-test.cpp @@ -1,9 +1,16 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include <zencore/logging.h> +#include <zencore/filesystem.h> #include <zencore/zencore.h> #include <zenstore/zenstore.h> +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# include <sys/time.h> +# include <sys/resource.h> +# include <zencore/except.h> +#endif + #if ZEN_WITH_TESTS # define DOCTEST_CONFIG_IMPLEMENT # include <zencore/testing.h> @@ -17,6 +24,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) zen::zenstore_forcelinktests(); zen::logging::InitializeLogging(); + zen::MaximizeOpenFileCount(); return doctest::Context(argc, argv).run(); #else diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp index e795b67eb..8eb172a1c 100644 --- a/zenstore/basicfile.cpp +++ b/zenstore/basicfile.cpp @@ -352,14 +352,6 @@ BasicFile::SetFileSize(uint64_t FileSize) ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); } } - if (FileSize > 0) - { - int Error = posix_fallocate(Fd, 0, (off_t)FileSize); - if (Error) - { - ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle))); - } - } #else int Fd = int(intptr_t(m_FileHandle)); if (ftruncate64(Fd, (off64_t)FileSize) < 0) diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index ce93a3320..920ed965f 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -1275,7 +1275,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource) { const LegacyCasDiskIndexEntry& Record(Entry.second); - BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()); + BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()}; BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); LogEntries.push_back( {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); @@ -2272,113 +2272,121 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1048; - const int32_t kChunkCount = 8192; + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 4096; + uint64_t ExpectedSize = 0; - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(kChunkCount); - std::vector<IoBuffer> Chunks; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks; Chunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - IoBuffer Chunk = CreateChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - ChunkHashes.emplace_back(Hash); - Chunks.emplace_back(Chunk); + while (true) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + if (Chunks.contains(Hash)) + { + continue; + } + Chunks[Hash] = Chunk; + ExpectedSize += Chunk.Size(); + break; + } } + std::atomic<size_t> WorkCompleted = 0; WorkerThreadPool ThreadPool(4); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); Cas.Initialize("test", 32768, 16, true); { - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : Chunks) { - const IoBuffer& Chunk = Chunks[Idx]; - const IoHash& Hash = ChunkHashes[Idx]; - ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + const IoHash& Hash = Chunk.first; + const IoBuffer& Buffer = Chunk.second; + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash); ZEN_ASSERT(InsertResult.New); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } } + WorkCompleted = 0; const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + CHECK_EQ(ExpectedSize, TotalSize); { - std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : Chunks) { - ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { - IoHash ChunkHash = OldChunkHashes[Idx]; - IoBuffer Chunk = Cas.FindChunk(ChunkHash); - IoHash Hash = IoHash::HashBuffer(Chunk); + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Buffer); CHECK(ChunkHash == Hash); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < Chunks.size()) { Sleep(1); } } - std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes; + GcChunkHashes.reserve(Chunks.size()); + for (const auto& Chunk : Chunks) { - std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); - std::vector<IoHash> NewChunkHashes; - NewChunkHashes.reserve(kChunkCount); - std::vector<IoBuffer> NewChunks; + GcChunkHashes.insert(Chunk.first); + } + { + WorkCompleted = 0; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks; NewChunks.reserve(kChunkCount); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { - IoBuffer Chunk = CreateChunk(kChunkSize); - IoHash Hash = HashBuffer(Chunk); - NewChunkHashes.emplace_back(Hash); - NewChunks.emplace_back(Chunk); + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunks[Hash] = Chunk; } - RwLock ChunkHashesLock; std::atomic_uint32_t AddedChunkCount; - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + for (const auto& Chunk : NewChunks) { - const IoBuffer& Chunk = NewChunks[Idx]; - const IoHash& Hash = NewChunkHashes[Idx]; - ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { - CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() { + Cas.InsertChunk(Chunk.second, Chunk.first); AddedChunkCount.fetch_add(1); + WorkCompleted.fetch_add(1); }); - ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { - IoHash ChunkHash = OldChunkHashes[Idx]; - IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); - if (Chunk) + } + for (const auto& Chunk : Chunks) + { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() { + IoHash ChunkHash = Chunk.first; + IoBuffer Buffer = Cas.FindChunk(ChunkHash); + if (Buffer) { - CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + CHECK(ChunkHash == IoHash::HashBuffer(Buffer)); } + WorkCompleted.fetch_add(1); }); } - while (AddedChunkCount.load() < kChunkCount) + while (AddedChunkCount.load() < NewChunks.size()) { - std::vector<IoHash> AddedHashes; - { - RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); - } // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const IoHash& ChunkHash : AddedHashes) + for (const auto& Chunk : NewChunks) { - if (Cas.HaveChunk(ChunkHash)) + if (Cas.HaveChunk(Chunk.first)) { - GcChunkHashes.emplace(ChunkHash); + GcChunkHashes.emplace(Chunk.first); } } std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); @@ -2409,54 +2417,57 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < NewChunks.size() + Chunks.size()) { Sleep(1); } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const auto& Chunk : NewChunks) { - std::vector<IoHash> AddedHashes; + if (Cas.HaveChunk(Chunk.first)) { - RwLock::ExclusiveLockScope _(ChunkHashesLock); - AddedHashes.swap(NewChunkHashes); + GcChunkHashes.emplace(Chunk.first); } - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const IoHash& ChunkHash : AddedHashes) + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) { - if (Cas.HaveChunk(ChunkHash)) + if (C < KeepHashes.size() - 1) { - GcChunkHashes.emplace(ChunkHash); + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 77 == 0 && C < KeepHashes.size() - 1) + if (C + 3 < KeepHashes.size() - 1) { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; KeepHashes.pop_back(); } - C++; } - - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); - Cas.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + C++; } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } { + WorkCompleted = 0; for (const IoHash& ChunkHash : GcChunkHashes) { - ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() { CHECK(Cas.HaveChunk(ChunkHash)); CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + WorkCompleted.fetch_add(1); }); } - while (ThreadPool.PendingWork() > 0) + while (WorkCompleted < GcChunkHashes.size()) { Sleep(1); } diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 287dfb48a..856f9af02 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -123,11 +123,6 @@ namespace { { return MakeErrorCodeFromLastError(); } - int Error = posix_fallocate(Fd, 0, (off_t)Size); - if (Error) - { - return MakeErrorCode(Error); - } # else if (ftruncate64(Fd, (off64_t)Size) < 0) { |