aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/remote_build.py3
-rw-r--r--zencore/filesystem.cpp27
-rw-r--r--zencore/include/zencore/filesystem.h4
-rw-r--r--zencore/include/zencore/string.h4
-rw-r--r--zenserver/zenserver.cpp5
-rw-r--r--zenstore-test/zenstore-test.cpp8
-rw-r--r--zenstore/basicfile.cpp8
-rw-r--r--zenstore/compactcas.cpp169
-rw-r--r--zenstore/gc.cpp5
9 files changed, 137 insertions, 96 deletions
diff --git a/scripts/remote_build.py b/scripts/remote_build.py
index 70f9cf9bf..d814d4a66 100644
--- a/scripts/remote_build.py
+++ b/scripts/remote_build.py
@@ -108,13 +108,14 @@ def _local(args):
# Validate key file. Git's SSH uses OpenSSL which needs UNIX line-endings
if args.keyfile:
+ """
with open(args.keyfile, "rt") as key_file:
lines = [x.strip() for x in key_file]
with open(args.keyfile, "wb") as key_file:
for line in lines:
key_file.write(line.encode() + b"\n")
-
+ """
identity = ("-i", args.keyfile)
else:
identity = ()
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index e2778089b..7ff0dc45c 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -23,6 +23,7 @@
#if ZEN_PLATFORM_LINUX
# include <dirent.h>
# include <fcntl.h>
+# include <sys/resource.h>
# include <sys/stat.h>
# include <unistd.h>
#endif
@@ -31,6 +32,7 @@
# include <dirent.h>
# include <fcntl.h>
# include <libproc.h>
+# include <sys/resource.h>
# include <sys/stat.h>
# include <sys/syslimits.h>
# include <unistd.h>
@@ -986,6 +988,31 @@ GetRunningExecutablePath()
#endif // ZEN_PLATFORM_WINDOWS
}
+void
+MaximizeOpenFileCount()
+{
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ struct rlimit Limit;
+ int Error = getrlimit(RLIMIT_NOFILE, &Limit);
+ if (Error)
+ {
+ ZEN_WARN("failed getting rlimit RLIMIT_NOFILE, reason '{}'", zen::MakeErrorCode(Error).message());
+ }
+ else
+ {
+ struct rlimit NewLimit = Limit;
+ NewLimit.rlim_cur = NewLimit.rlim_max;
+ ZEN_INFO("changing RLIMIT_NOFILE from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max);
+
+ Error = setrlimit(RLIMIT_NOFILE, &NewLimit);
+ if (Error != 0)
+ {
+ ZEN_WARN("failed to set RLIMIT_NOFILE limits from rlim_cur = {}, rlim_max {} to rlim_cur = {}, rlim_max {}, reason '{}'", Limit.rlim_cur, Limit.rlim_max, NewLimit.rlim_cur, NewLimit.rlim_max, zen::MakeErrorCode(Error).message());
+ }
+ }
+#endif
+}
+
//////////////////////////////////////////////////////////////////////////
//
// Testing related code follows...
diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h
index d1a5f3e0c..a6e76eaa0 100644
--- a/zencore/include/zencore/filesystem.h
+++ b/zencore/include/zencore/filesystem.h
@@ -34,6 +34,10 @@ ZENCORE_API std::filesystem::path PathFromHandle(void* NativeHandle);
ZENCORE_API std::filesystem::path GetRunningExecutablePath();
+/** Set the max open file handle count to max allowed for the current process on Linux and MacOS
+ */
+ZENCORE_API void MaximizeOpenFileCount();
+
struct FileContents
{
std::vector<IoBuffer> Data;
diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h
index e502120ac..027730063 100644
--- a/zencore/include/zencore/string.h
+++ b/zencore/include/zencore/string.h
@@ -597,7 +597,7 @@ ToHexNumber(const uint8_t* InputData, size_t ByteCount, char* OutString)
/// <param name="Value">Integer value type</param>
/// <param name="outString">Output buffer where resulting string is written</param>
void
-ToHexNumber(std::unsigned_integral auto Value, char* OutString)
+ToHexNumber(UnsignedIntegral auto Value, char* OutString)
{
ToHexNumber((const uint8_t*)&Value, sizeof(Value), OutString);
OutString[sizeof(Value) * 2] = 0;
@@ -611,7 +611,7 @@ ToHexNumber(std::unsigned_integral auto Value, char* OutString)
/// <param name="OutValue">Pointer to output value</param>
/// <returns>true if the input consisted of all valid hexadecimal characters</returns>
bool
-ParseHexNumber(const std::string HexString, std::unsigned_integral auto& OutValue)
+ParseHexNumber(const std::string HexString, UnsignedIntegral auto& OutValue)
{
return ParseHexNumber(HexString.c_str(), sizeof(OutValue) * 2, (uint8_t*)&OutValue);
}
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index f81deb167..3ac5b9992 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -1018,6 +1018,8 @@ ZenEntryPoint::Run()
InitializeLogging(ServerOptions);
+ MaximizeOpenFileCount();
+
ZEN_INFO(ZEN_APP_NAME " - using lock file at '{}'", LockFilePath);
ZEN_INFO(ZEN_APP_NAME " - starting on port {}, version '{}'", ServerOptions.BasePort, ZEN_CFG_VERSION_BUILD_STRING_FULL);
@@ -1156,9 +1158,10 @@ test_main(int argc, char** argv)
zen::z$_forcelink();
zen::logging::InitializeLogging();
-
spdlog::set_level(spdlog::level::debug);
+ zen::MaximizeOpenFileCount();
+
return doctest::Context(argc, argv).run();
}
#endif
diff --git a/zenstore-test/zenstore-test.cpp b/zenstore-test/zenstore-test.cpp
index e6bd92ab9..030b1159d 100644
--- a/zenstore-test/zenstore-test.cpp
+++ b/zenstore-test/zenstore-test.cpp
@@ -1,9 +1,16 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include <zencore/logging.h>
+#include <zencore/filesystem.h>
#include <zencore/zencore.h>
#include <zenstore/zenstore.h>
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+# include <sys/time.h>
+# include <sys/resource.h>
+# include <zencore/except.h>
+#endif
+
#if ZEN_WITH_TESTS
# define DOCTEST_CONFIG_IMPLEMENT
# include <zencore/testing.h>
@@ -17,6 +24,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
zen::zenstore_forcelinktests();
zen::logging::InitializeLogging();
+ zen::MaximizeOpenFileCount();
return doctest::Context(argc, argv).run();
#else
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index e795b67eb..8eb172a1c 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -352,14 +352,6 @@ BasicFile::SetFileSize(uint64_t FileSize)
ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
}
}
- if (FileSize > 0)
- {
- int Error = posix_fallocate(Fd, 0, (off_t)FileSize);
- if (Error)
- {
- ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
- }
- }
#else
int Fd = int(intptr_t(m_FileHandle));
if (ftruncate64(Fd, (off64_t)FileSize) < 0)
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index ce93a3320..920ed965f 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -1275,7 +1275,7 @@ CasContainerStrategy::MigrateLegacyData(bool CleanSource)
{
const LegacyCasDiskIndexEntry& Record(Entry.second);
- BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize());
+ BlockStoreLocation NewChunkLocation{WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()};
BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
LogEntries.push_back(
{.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
@@ -2272,113 +2272,121 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
CreateDirectories(CasConfig.RootDirectory);
- const uint64_t kChunkSize = 1048;
- const int32_t kChunkCount = 8192;
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 4096;
+ uint64_t ExpectedSize = 0;
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(kChunkCount);
- std::vector<IoBuffer> Chunks;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> Chunks;
Chunks.reserve(kChunkCount);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- ChunkHashes.emplace_back(Hash);
- Chunks.emplace_back(Chunk);
+ while (true)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ if (Chunks.contains(Hash))
+ {
+ continue;
+ }
+ Chunks[Hash] = Chunk;
+ ExpectedSize += Chunk.Size();
+ break;
+ }
}
+ std::atomic<size_t> WorkCompleted = 0;
WorkerThreadPool ThreadPool(4);
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
Cas.Initialize("test", 32768, 16, true);
{
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : Chunks)
{
- const IoBuffer& Chunk = Chunks[Idx];
- const IoHash& Hash = ChunkHashes[Idx];
- ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ const IoHash& Hash = Chunk.first;
+ const IoBuffer& Buffer = Chunk.second;
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Buffer, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Buffer, Hash);
ZEN_ASSERT(InsertResult.New);
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
}
+ WorkCompleted = 0;
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ CHECK_EQ(ExpectedSize, TotalSize);
{
- std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : Chunks)
{
- ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() {
- IoHash ChunkHash = OldChunkHashes[Idx];
- IoBuffer Chunk = Cas.FindChunk(ChunkHash);
- IoHash Hash = IoHash::HashBuffer(Chunk);
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, &Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Buffer);
CHECK(ChunkHash == Hash);
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < Chunks.size())
{
Sleep(1);
}
}
- std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes;
+ GcChunkHashes.reserve(Chunks.size());
+ for (const auto& Chunk : Chunks)
{
- std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
- std::vector<IoHash> NewChunkHashes;
- NewChunkHashes.reserve(kChunkCount);
- std::vector<IoBuffer> NewChunks;
+ GcChunkHashes.insert(Chunk.first);
+ }
+ {
+ WorkCompleted = 0;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> NewChunks;
NewChunks.reserve(kChunkCount);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- IoHash Hash = HashBuffer(Chunk);
- NewChunkHashes.emplace_back(Hash);
- NewChunks.emplace_back(Chunk);
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunks[Hash] = Chunk;
}
- RwLock ChunkHashesLock;
std::atomic_uint32_t AddedChunkCount;
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ for (const auto& Chunk : NewChunks)
{
- const IoBuffer& Chunk = NewChunks[Idx];
- const IoHash& Hash = NewChunkHashes[Idx];
- ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() {
- CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk, &AddedChunkCount]() {
+ Cas.InsertChunk(Chunk.second, Chunk.first);
AddedChunkCount.fetch_add(1);
+ WorkCompleted.fetch_add(1);
});
- ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() {
- IoHash ChunkHash = OldChunkHashes[Idx];
- IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]);
- if (Chunk)
+ }
+ for (const auto& Chunk : Chunks)
+ {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, Chunk]() {
+ IoHash ChunkHash = Chunk.first;
+ IoBuffer Buffer = Cas.FindChunk(ChunkHash);
+ if (Buffer)
{
- CHECK(ChunkHash == IoHash::HashBuffer(Chunk));
+ CHECK(ChunkHash == IoHash::HashBuffer(Buffer));
}
+ WorkCompleted.fetch_add(1);
});
}
- while (AddedChunkCount.load() < kChunkCount)
+ while (AddedChunkCount.load() < NewChunks.size())
{
- std::vector<IoHash> AddedHashes;
- {
- RwLock::ExclusiveLockScope _(ChunkHashesLock);
- AddedHashes.swap(NewChunkHashes);
- }
// Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const IoHash& ChunkHash : AddedHashes)
+ for (const auto& Chunk : NewChunks)
{
- if (Cas.HaveChunk(ChunkHash))
+ if (Cas.HaveChunk(Chunk.first))
{
- GcChunkHashes.emplace(ChunkHash);
+ GcChunkHashes.emplace(Chunk.first);
}
}
std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
@@ -2409,54 +2417,57 @@ TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < NewChunks.size() + Chunks.size())
{
Sleep(1);
}
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const auto& Chunk : NewChunks)
{
- std::vector<IoHash> AddedHashes;
+ if (Cas.HaveChunk(Chunk.first))
{
- RwLock::ExclusiveLockScope _(ChunkHashesLock);
- AddedHashes.swap(NewChunkHashes);
+ GcChunkHashes.emplace(Chunk.first);
}
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const IoHash& ChunkHash : AddedHashes)
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
{
- if (Cas.HaveChunk(ChunkHash))
+ if (C < KeepHashes.size() - 1)
{
- GcChunkHashes.emplace(ChunkHash);
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
}
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 77 == 0 && C < KeepHashes.size() - 1)
+ if (C + 3 < KeepHashes.size() - 1)
{
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
KeepHashes.pop_back();
}
- C++;
}
-
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
- Cas.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ C++;
}
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
{
+ WorkCompleted = 0;
for (const IoHash& ChunkHash : GcChunkHashes)
{
- ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
+ ThreadPool.ScheduleWork([&Cas, &WorkCompleted, ChunkHash]() {
CHECK(Cas.HaveChunk(ChunkHash));
CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ WorkCompleted.fetch_add(1);
});
}
- while (ThreadPool.PendingWork() > 0)
+ while (WorkCompleted < GcChunkHashes.size())
{
Sleep(1);
}
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 287dfb48a..856f9af02 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -123,11 +123,6 @@ namespace {
{
return MakeErrorCodeFromLastError();
}
- int Error = posix_fallocate(Fd, 0, (off_t)Size);
- if (Error)
- {
- return MakeErrorCode(Error);
- }
# else
if (ftruncate64(Fd, (off64_t)Size) < 0)
{