aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-11-01 14:38:00 +0100
committerPer Larsson <[email protected]>2021-11-01 14:38:00 +0100
commit835b1a1f2ab4cd7b4044759f1fd151cb8ad0a31d (patch)
treee914510031269ec33e364d7d553316ddb201b059
parentFirst pass batch request. (diff)
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-835b1a1f2ab4cd7b4044759f1fd151cb8ad0a31d.tar.xz
zen-835b1a1f2ab4cd7b4044759f1fd151cb8ad0a31d.zip
Merge branch 'main' into zcache-batch
-rw-r--r--zencore/filesystem.cpp2
-rw-r--r--zencore/include/zencore/sharedbuffer.h6
-rw-r--r--zencore/memory.cpp4
-rw-r--r--zencore/xmake.lua2
-rw-r--r--zenserver-test/zenserver-test.cpp27
-rw-r--r--zenserver/cache/cacheagent.cpp5
-rw-r--r--zenserver/cache/cacheagent.h9
-rw-r--r--zenserver/cache/structuredcachestore.cpp215
-rw-r--r--zenserver/cache/structuredcachestore.h93
-rw-r--r--zenserver/config.cpp7
-rw-r--r--zenserver/config.h1
-rw-r--r--zenserver/diag/formatters.h20
-rw-r--r--zenserver/upstream/jupiter.cpp10
-rw-r--r--zenserver/upstream/upstreamcache.cpp195
-rw-r--r--zenserver/zenserver.cpp154
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters6
-rw-r--r--zenstore/basicfile.cpp48
-rw-r--r--zenstore/include/zenstore/basicfile.h18
-rw-r--r--zenutil/include/zenutil/zenserverprocess.h16
-rw-r--r--zenutil/zenserverprocess.cpp6
21 files changed, 583 insertions, 263 deletions
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index b8e8eff55..a642e2cf6 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -840,7 +840,7 @@ TEST_CASE("filesystem")
// GetExePath -- this is not a great test as it's so dependent on where the this code gets linked in
path BinPath = GetRunningExecutablePath();
- const bool ExpectedExe = BinPath.stem() == "zencore-test" || BinPath.stem() == "zenserver-test" || BinPath.stem() == "zenstore-test";
+ const bool ExpectedExe = ToUtf8(BinPath.stem().native()).ends_with("-test"sv) || BinPath.stem() == "zenserver";
CHECK(ExpectedExe);
CHECK(is_regular_file(BinPath));
diff --git a/zencore/include/zencore/sharedbuffer.h b/zencore/include/zencore/sharedbuffer.h
index 640c3fe74..1f87dc639 100644
--- a/zencore/include/zencore/sharedbuffer.h
+++ b/zencore/include/zencore/sharedbuffer.h
@@ -143,6 +143,12 @@ public:
/** Make a non-owned view of the input */
[[nodiscard]] inline static SharedBuffer MakeView(MemoryView View) { return MakeView(View.GetData(), View.GetSize()); }
+ /** Make a non-owning view of the memory of the contiguous container. */
+ [[nodiscard]] inline static SharedBuffer MakeView(const std::ranges::contiguous_range auto& Container)
+ {
+ std::span Span = Container;
+ return MakeView(Span.data(), Span.size() * sizeof(typename decltype(Span)::element_type));
+ }
/** Make a non-owned view of the input */
[[nodiscard]] ZENCORE_API static SharedBuffer MakeView(const void* Data, uint64_t Size);
/** Make a non-owned view of the input */
diff --git a/zencore/memory.cpp b/zencore/memory.cpp
index 62c81076d..14ea7ca1d 100644
--- a/zencore/memory.cpp
+++ b/zencore/memory.cpp
@@ -185,13 +185,13 @@ TEST_CASE("ChunkingLinearAllocator")
TEST_CASE("MemoryView")
{
{
- uint8_t Array1[16];
+ uint8_t Array1[16] = {};
MemoryView View1 = MakeMemoryView(Array1);
CHECK(View1.GetSize() == 16);
}
{
- uint32_t Array2[16];
+ uint32_t Array2[16] = {};
MemoryView View2 = MakeMemoryView(Array2);
CHECK(View2.GetSize() == 64);
}
diff --git a/zencore/xmake.lua b/zencore/xmake.lua
index 5de7f476d..0b0b51758 100644
--- a/zencore/xmake.lua
+++ b/zencore/xmake.lua
@@ -8,7 +8,9 @@ target('zencore')
"vcpkg::spdlog",
"vcpkg::fmt",
"vcpkg::doctest",
+ "vcpkg::json11",
"vcpkg::lz4",
+ "vcpkg::mimalloc",
"vcpkg::cpr",
"vcpkg::curl", -- required by cpr
"vcpkg::zlib", -- required by curl
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 639f40689..876979914 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -66,6 +66,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#endif
using namespace fmt::literals;
+using namespace std::literals;
/*
@@ -126,10 +127,10 @@ public:
m_Headers.reserve(16);
zen::ExtendableStringBuilder<256> RequestBody;
- RequestBody << "GET " << Path << " HTTP/1.1\r\n";
- RequestBody << "Host: " << Server << "\r\n";
- RequestBody << "Accept: */*\r\n";
- RequestBody << "Connection: " << (m_KeepAlive ? "keep-alive" : "close") << "\r\n\r\n"; // TODO: support keep-alive
+ RequestBody << "GET "sv << Path << " HTTP/1.1\r\n"sv;
+ RequestBody << "Host: "sv << Server << "\r\n"sv;
+ RequestBody << "Accept: */*\r\n"sv;
+ RequestBody << "Connection: "sv << (m_KeepAlive ? "keep-alive"sv : "close"sv) << "\r\n\r\n"sv; // TODO: support keep-alive
m_RequestBody = RequestBody;
@@ -460,12 +461,12 @@ using namespace spdlog;
using namespace spdlog::details;
using namespace std::literals;
-class full_formatter final : public spdlog::formatter
+class full_test_formatter final : public spdlog::formatter
{
public:
- full_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {}
+ full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {}
- virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_formatter>(m_LogId, m_Epoch); }
+ virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); }
static constexpr bool UseDate = false;
@@ -680,7 +681,7 @@ main(int argc, char** argv)
zen::logging::InitializeLogging();
spdlog::set_level(spdlog::level::debug);
- spdlog::set_formatter(std::make_unique<::logging::full_formatter>("test", std::chrono::system_clock::now()));
+ spdlog::set_formatter(std::make_unique<::logging::full_test_formatter>("test", std::chrono::system_clock::now()));
std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path();
std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test";
@@ -1704,11 +1705,12 @@ TEST_CASE("zcache.policy")
zen::IoHash Key;
zen::IoHash PayloadId;
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
// Store package locally
{
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
CHECK(Package.GetAttachments().size() != 0);
cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
@@ -1761,11 +1763,12 @@ TEST_CASE("zcache.policy")
zen::IoHash Key;
zen::IoHash PayloadId;
- zen::CbPackage Package = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(Package);
// Store package upstream
{
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
CHECK(Package.GetAttachments().size() != 0);
cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
diff --git a/zenserver/cache/cacheagent.cpp b/zenserver/cache/cacheagent.cpp
deleted file mode 100644
index f4d1cabe6..000000000
--- a/zenserver/cache/cacheagent.cpp
+++ /dev/null
@@ -1,5 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "cacheagent.h"
-
-#include <gsl/gsl-lite.hpp>
diff --git a/zenserver/cache/cacheagent.h b/zenserver/cache/cacheagent.h
deleted file mode 100644
index 145d0f79f..000000000
--- a/zenserver/cache/cacheagent.h
+++ /dev/null
@@ -1,9 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-class CacheAgent
-{
-public:
-private:
-};
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index e7b840ed8..119062833 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -6,18 +6,26 @@
#include <zencore/windows.h>
#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zencore/thread.h>
-#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
#include <concepts>
#include <filesystem>
+#include <ranges>
#include <unordered_map>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -31,7 +39,7 @@ namespace zen {
using namespace fmt::literals;
-ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir}
+ZenCacheStore::ZenCacheStore(const std::filesystem::path& RootDir) : m_DiskLayer{RootDir}
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
CreateDirectories(RootDir);
@@ -125,7 +133,8 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
void
ZenCacheStore::GarbageCollect(GcContext& GcCtx)
{
- ZEN_UNUSED(GcCtx);
+ m_DiskLayer.GarbageCollect(GcCtx);
+ m_MemLayer.GarbageCollect(GcCtx);
}
//////////////////////////////////////////////////////////////////////////
@@ -150,7 +159,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
return false;
}
- CacheBucket* Bucket = Bucket = &it->second;
+ CacheBucket* Bucket = &it->second;
_.ReleaseNow();
@@ -213,7 +222,12 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
void
ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx)
{
- ZEN_UNUSED(GcCtx);
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ Kv.second.GarbageCollect(GcCtx);
+ }
}
void
@@ -302,110 +316,50 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
//////////////////////////////////////////////////////////////////////////
-#pragma pack(push)
-#pragma pack(1)
+inline DiskLocation::DiskLocation() = default;
-struct DiskLocation
+inline DiskLocation::DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+: OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+, LowerSize(ValueSize & 0xFFFFffff)
+, IndexDataSize(IndexSize)
{
- inline DiskLocation() = default;
+}
- inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
- : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
- , LowerSize(ValueSize & 0xFFFFffff)
- , IndexDataSize(IndexSize)
- {
- }
+inline uint64_t DiskLocation::CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags)
+{
+ return Offset | Flags;
+}
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
+inline uint64_t DiskLocation::Offset() const
+{
+ return OffsetAndFlags & kOffsetMask;
+}
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+inline uint64_t DiskLocation::Size() const
+{
+ return LowerSize;
+}
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t Size() const { return LowerSize; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
+inline uint64_t DiskLocation::IsFlagSet(uint64_t Flag) const
+{
+ return OffsetAndFlags & Flag;
+}
- if (IsFlagSet(DiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
+inline ZenContentType DiskLocation::GetContentType() const
+{
+ ZenContentType ContentType = ZenContentType::kBinary;
- return ContentType;
+ if (IsFlagSet(DiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
}
-private:
- uint64_t OffsetAndFlags = 0;
- uint32_t LowerSize = 0;
- uint32_t IndexDataSize = 0;
-};
-
-struct DiskIndexEntry
-{
- IoHash Key;
- DiskLocation Location;
-};
-
-#pragma pack(pop)
+ return ContentType;
+}
-static_assert(sizeof(DiskIndexEntry) == 36);
+//////////////////////////////////////////////////////////////////////////
-struct ZenCacheDiskLayer::CacheBucket
-{
- CacheBucket(CasStore& Cas);
- ~CacheBucket();
-
- void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- static bool Delete(std::filesystem::path BucketDir);
-
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value);
- void Drop();
- void Flush();
- void Scrub(ScrubContext& Ctx);
- void GarbageCollect(GcContext& GcCtx);
-
- inline bool IsOk() const { return m_IsOk; }
-
-private:
- CasStore& m_CasStore;
- std::filesystem::path m_BucketDir;
- Oid m_BucketId;
- bool m_IsOk = false;
- uint64_t m_LargeObjectThreshold = 64 * 1024;
-
- // These files are used to manage storage of small objects for this bucket
-
- BasicFile m_SobsFile;
- TCasLogFile<DiskIndexEntry> m_SlogFile;
-
- RwLock m_IndexLock;
- tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
- uint64_t m_WriteCursor = 0;
-
- void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
- bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
-
- // These locks are here to avoid contention on file creation, therefore it's sufficient
- // that we take the same lock for the same hash
- //
- // These locks are small and should really be spaced out so they don't share cache lines,
- // but we don't currently access them at particularly high frequency so it should not be
- // an issue in practice
-
- RwLock m_ShardedLocks[256];
- inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
-};
-
-ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas)
+ZenCacheDiskLayer::CacheBucket::CacheBucket()
{
}
@@ -839,7 +793,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas)
+ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir)
{
}
@@ -873,7 +827,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
}
else
{
- auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore);
+ auto It = m_Buckets.try_emplace(std::string(InBucket));
Bucket = &It.first->second;
std::filesystem::path BucketPath = m_RootDir;
@@ -916,7 +870,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
}
else
{
- auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore);
+ auto It = m_Buckets.try_emplace(std::string(InBucket));
Bucket = &It.first->second;
std::filesystem::path bucketPath = m_RootDir;
@@ -972,7 +926,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
}
else
{
- auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore);
+ auto InsertResult = m_Buckets.try_emplace(BucketName8);
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName8;
@@ -1050,7 +1004,64 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
void
ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx)
{
- ZEN_UNUSED(GcCtx);
+ RwLock::SharedLockScope _(m_Lock);
+
+ for (auto& Kv : m_Buckets)
+ {
+ Kv.second.GarbageCollect(GcCtx);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("z$.store")
+{
+ using namespace fmt::literals;
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+
+ ZenCacheStore Zcs(TempDir.Path() / "cache");
+
+ const int kIterationCount = 100;
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ CbObjectWriter Cbo;
+ Cbo << "hey" << i;
+ CbObject Obj = Cbo.Save();
+
+ ZenCacheValue Value;
+ Value.Value = Obj.GetBuffer().AsIoBuffer();
+ Value.Value.SetContentType(ZenContentType::kCbObject);
+
+ Zcs.Put("test_bucket"sv, Key, Value);
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ const IoHash Key = IoHash::HashBuffer(&i, sizeof i);
+
+ ZenCacheValue Value;
+ Zcs.Get("test_bucket"sv, Key, /* out */ Value);
+
+ REQUIRE(Value.Value);
+ CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject);
+ CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None);
+ CbObject Obj = LoadCompactBinaryObject(Value.Value);
+ CHECK_EQ(Obj["hey"].AsInt32(), i);
+ }
+}
+
+#endif
+
+void
+z$_forcelink()
+{
}
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 4753af627..6521d8393 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -7,7 +7,9 @@
#include <zencore/iohash.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
#pragma warning(push)
#pragma warning(disable : 4127)
@@ -98,10 +100,46 @@ private:
Configuration m_Configuration;
};
+#pragma pack(push)
+#pragma pack(1)
+
+struct DiskLocation
+{
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
+
+ DiskLocation();
+ DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags);
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags);
+ uint64_t Offset() const;
+ uint64_t Size() const;
+ uint64_t IsFlagSet(uint64_t Flag) const;
+ ZenContentType GetContentType() const;
+
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+};
+
+struct DiskIndexEntry
+{
+ IoHash Key;
+ DiskLocation Location;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(DiskIndexEntry) == 36);
+
class ZenCacheDiskLayer
{
public:
- ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir);
+ explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir);
~ZenCacheDiskLayer();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -117,9 +155,54 @@ private:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
*/
- struct CacheBucket;
+ struct CacheBucket
+ {
+ CacheBucket();
+ ~CacheBucket();
+
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ static bool Delete(std::filesystem::path BucketDir);
+
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ void Drop();
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ inline bool IsOk() const { return m_IsOk; }
+
+ private:
+ std::filesystem::path m_BucketDir;
+ Oid m_BucketId;
+ bool m_IsOk = false;
+ uint64_t m_LargeObjectThreshold = 64 * 1024;
+
+ // These files are used to manage storage of small objects for this bucket
+
+ BasicFile m_SobsFile;
+ TCasLogFile<DiskIndexEntry> m_SlogFile;
+
+ RwLock m_IndexLock;
+ tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
+
+ void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+
+ // These locks are here to avoid contention on file creation, therefore it's sufficient
+ // that we take the same lock for the same hash
+ //
+ // These locks are small and should really be spaced out so they don't share cache lines,
+ // but we don't currently access them at particularly high frequency so it should not be
+ // an issue in practice
+
+ RwLock m_ShardedLocks[256];
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
+ };
- CasStore& m_CasStore;
std::filesystem::path m_RootDir;
RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive
@@ -128,7 +211,7 @@ private:
class ZenCacheStore
{
public:
- ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir);
+ explicit ZenCacheStore(const std::filesystem::path& RootDir);
~ZenCacheStore();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -146,4 +229,6 @@ private:
uint64_t m_LastScrubTime = 0;
};
+void z$_forcelink();
+
} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index cbdbebb03..f512f8015 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -234,6 +234,13 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_option("cache",
"",
+ "upstream-zen-dns",
+ "DNS that resolves to one or more Zen server instance(s)",
+ cxxopts::value<std::vector<std::string>>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Dns)->default_value(""),
+ "");
+
+ options.add_option("cache",
+ "",
"upstream-thread-count",
"Number of threads used for upstream procsssing",
cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
diff --git a/zenserver/config.h b/zenserver/config.h
index 2a5a17fb9..72a4f31bb 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -42,6 +42,7 @@ struct ZenUpstreamJupiterConfig
struct ZenUpstreamZenConfig
{
std::vector<std::string> Urls;
+ std::vector<std::string> Dns;
};
enum class UpstreamCachePolicy : uint8_t
diff --git a/zenserver/diag/formatters.h b/zenserver/diag/formatters.h
index 42f928efe..759df58d3 100644
--- a/zenserver/diag/formatters.h
+++ b/zenserver/diag/formatters.h
@@ -2,6 +2,11 @@
#pragma once
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/iobuffer.h>
+#include <zencore/string.h>
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
#include <fmt/format.h>
@@ -17,7 +22,7 @@ struct fmt::formatter<cpr::Response>
{
using namespace std::literals;
- if (Response.status_code == 200)
+ if (Response.status_code == 200 || Response.status_code == 201)
{
return fmt::format_to(Ctx.out(),
"Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s",
@@ -32,18 +37,21 @@ struct fmt::formatter<cpr::Response>
const auto It = Response.header.find("Content-Type");
const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv;
- const bool IsBinary = ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv ||
- ContentType == "application/octet-stream";
-
- if (IsBinary)
+ if (ContentType == "application/x-ue-cb"sv)
{
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::CbObjectView Obj(Body.Data());
+ zen::ExtendableStringBuilder<256> Sb;
+ std::string_view Json = Obj.ToJson(Sb).ToView();
+
return fmt::format_to(Ctx.out(),
- "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reason: '{}'",
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'",
Response.url.str(),
Response.status_code,
Response.uploaded_bytes,
Response.downloaded_bytes,
Response.elapsed,
+ Json,
Response.reason);
}
else
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 556a2124d..7a36b5841 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -87,12 +87,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw";
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -137,6 +137,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -172,6 +173,7 @@ CloudCacheSession::GetBlob(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -203,6 +205,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -234,6 +237,7 @@ CloudCacheSession::GetObject(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -383,6 +387,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
{"X-Jupiter-IoHash", RefHash.ToHexString()},
{"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{});
cpr::Response Response = Session.Post();
ZEN_DEBUG("POST {}", Response);
@@ -546,6 +551,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 168449d05..c0cd858b6 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -175,8 +175,10 @@ namespace detail {
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
{
+ using namespace fmt::literals;
+
ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
- const uint32_t MaxAttempts = 3;
+ const size_t MaxAttempts = 3;
try
{
@@ -204,117 +206,112 @@ namespace detail {
}
else
{
- bool Success = false;
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
- {
- Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool {
+ for (const IoHash& PayloadId : PayloadIds)
{
- if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- Result.Success)
+ const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId);
+
+ if (It == std::end(CacheRecord.PayloadIds))
{
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
- break;
+ OutReason = "payload '{}' MISSING from local cache"_format(PayloadId);
+ return false;
}
- }
- if (!Success)
- {
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It);
+
+ CloudCacheResult BlobResult;
+ for (size_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ {
+ BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ }
+
+ if (!BlobResult.Success)
+ {
+ OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason);
+ return false;
+ }
+
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
}
+
+ return true;
+ };
+
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult =
+ Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
}
- Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ if (!RefResult.Success)
{
- if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
- Result.Success)
- {
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
+ return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RefResult.Reason),
+ .Success = false};
+ }
- if (!Result.Needs.empty())
- {
- for (const IoHash& NeededHash : Result.Needs)
- {
- Success = false;
-
- if (auto It =
- std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
- It != std::end(CacheRecord.PayloadIds))
- {
- const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
-
- if (CloudCacheResult BlobResult =
- Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- BlobResult.Success)
- {
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
- Success = true;
- }
- else
- {
- ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
- else
- {
- ZEN_WARN("needed payload '{}/{}/{}' MISSING",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- if (FinalizeRefResult FinalizeResult =
- Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
- FinalizeResult.Success)
- {
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
- Success = true;
-
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- MissingHash);
- }
- }
- else
- {
- ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
- Success = false;
- }
- }
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
+
+ if (!FinalizeResult.Needs.empty())
+ {
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
+
+ FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
- if (Success)
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
{
- break;
+ Sb << MissingHash.ToHexString() << ",";
}
+
+ return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Sb.ToString()),
+ .Success = false};
}
}
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success};
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
}
}
catch (std::exception& Err)
@@ -890,6 +887,15 @@ private:
{
const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (!Result.Success)
+ {
+ ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
}
@@ -907,7 +913,10 @@ private:
}
catch (std::exception& e)
{
- ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what());
+ ZEN_WARN("upload cache record '{}/{}' FAILED, reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ e.what());
}
}
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index de30c5425..9a8090fc0 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -8,11 +8,13 @@
#include <zencore/logging.h>
#include <zencore/refcount.h>
#include <zencore/scopeguard.h>
+#include <zencore/session.h>
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/windows.h>
#include <zenhttp/httpserver.h>
+#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
#include <zenutil/zenserverprocess.h>
@@ -115,6 +117,37 @@ namespace zen {
using namespace std::literals;
using namespace fmt::literals;
+namespace utils {
+ asio::error_code ResolveHostname(asio::io_context& Ctx,
+ std::string_view Host,
+ std::string_view DefaultPort,
+ std::vector<std::string>& OutEndpoints)
+ {
+ std::string_view Port = DefaultPort;
+
+ if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos)
+ {
+ Port = Host.substr(Idx + 1);
+ Host = Host.substr(0, Idx);
+ }
+
+ asio::ip::tcp::resolver Resolver(Ctx);
+
+ asio::error_code ErrorCode;
+ asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode);
+
+ if (!ErrorCode)
+ {
+ for (const asio::ip::tcp::endpoint& Ep : Endpoints)
+ {
+ OutEndpoints.push_back("http://{}:{}"_format(Ep.address().to_string(), Ep.port()));
+ }
+ }
+
+ return ErrorCode;
+ }
+} // namespace utils
+
class ZenServer : public IHttpStatusProvider
{
public:
@@ -295,11 +328,13 @@ public:
const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
- m_CurrentState = kRunning;
+ SetNewState(kRunning);
+
+ OnReady();
m_Http->Run(IsInteractiveMode);
- m_CurrentState = kShuttingDown;
+ SetNewState(kShuttingDown);
ZEN_INFO(ZEN_APP_NAME " exiting");
@@ -321,6 +356,10 @@ public:
void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; }
void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
+ std::function<void()> m_IsReadyFunc;
+ void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); }
+ void OnReady();
+
void EnsureIoRunner()
{
if (!m_IoRunner.joinable())
@@ -434,6 +473,8 @@ private:
kShuttingDown
} m_CurrentState = kInitializing;
+ inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; }
+
std::string_view ToString(ServerState Value)
{
switch (Value)
@@ -475,6 +516,17 @@ private:
};
void
+ZenServer::OnReady()
+{
+ m_ServerEntry->SignalReady();
+
+ if (m_IsReadyFunc)
+ {
+ m_IsReadyFunc();
+ }
+}
+
+void
ZenServer::InitializeState(ZenServiceConfig& ServiceConfig)
{
// Check root manifest to deal with schema versioning
@@ -575,7 +627,7 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
+ m_CacheStore = std::make_unique<ZenCacheStore>(m_DataRoot / "cache");
std::unique_ptr<zen::UpstreamCache> UpstreamCache;
if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
@@ -595,12 +647,29 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
- if (!UpstreamConfig.ZenConfig.Urls.empty())
+ // Zen upstream
{
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls);
- UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls;
+ if (!UpstreamConfig.ZenConfig.Dns.empty())
+ {
+ for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns)
+ {
+ const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls);
+ if (Err)
+ {
+ ZEN_ERROR("resolve '{}' FAILED, reason '{}'", Err.message());
+ }
+ }
+ }
+
+ if (!ZenUrls.empty())
+ {
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(ZenUrls);
+ UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
+ }
}
+ // Jupiter upstream
{
zen::CloudCacheClientOptions Options;
if (UpstreamConfig.JupiterConfig.UseProductionSettings)
@@ -681,6 +750,7 @@ public:
private:
ZenServerOptions& m_GlobalOptions;
ZenServiceConfig& m_ServiceConfig;
+ zen::LockFile m_LockFile;
};
int
@@ -703,6 +773,32 @@ ZenWindowsService::Run()
try
{
+ // Mutual exclusion and synchronization
+
+ std::error_code Ec;
+
+ std::filesystem::path LockFilePath = GlobalOptions.DataDir / ".lock";
+
+ bool IsReady = false;
+
+ auto MakeLockData = [&] {
+ CbObjectWriter Cbo;
+ Cbo << "pid" << _getpid() << "data" << ToUtf8(GlobalOptions.DataDir) << "port" << GlobalOptions.BasePort << "session_id"
+ << GetSessionId() << "ready" << IsReady;
+ return Cbo.Save();
+ };
+
+ m_LockFile.Create(LockFilePath, MakeLockData(), Ec);
+
+ if (Ec)
+ {
+ ConsoleLog().error("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message());
+
+ std::exit(99);
+ }
+
+ InitializeLogging(GlobalOptions);
+
// Prototype config system, we'll see how this pans out
//
// TODO: we need to report any parse errors here
@@ -764,13 +860,19 @@ ZenWindowsService::Run()
}});
// If we have a parent process, establish the mechanisms we need
- // to be able to communicate with the parent
+ // to be able to communicate readiness with the parent
- if (!GlobalOptions.ChildId.empty())
- {
- zen::NamedEvent ParentEvent{GlobalOptions.ChildId};
- ParentEvent.Set();
- }
+ Server.SetIsReadyFunc([&] {
+ IsReady = true;
+
+ m_LockFile.Update(MakeLockData(), Ec);
+
+ if (!GlobalOptions.ChildId.empty())
+ {
+ zen::NamedEvent ParentEvent{GlobalOptions.ChildId};
+ ParentEvent.Set();
+ }
+ });
Server.Run();
Server.Cleanup();
@@ -788,6 +890,22 @@ ZenWindowsService::Run()
return 0;
}
+#if ZEN_WITH_TESTS
+int
+test_main(int argc, char** argv)
+{
+ zen::zencore_forcelinktests();
+ zen::zenhttp_forcelinktests();
+ zen::zenstore_forcelinktests();
+
+ zen::logging::InitializeLogging();
+
+ spdlog::set_level(spdlog::level::debug);
+
+ return doctest::Context(argc, argv).run();
+}
+#endif
+
int
main(int argc, char* argv[])
{
@@ -797,6 +915,16 @@ main(int argc, char* argv[])
mi_version();
#endif
+#if ZEN_WITH_TESTS
+ if (argc >= 2)
+ {
+ if (argv[1] == "test"sv)
+ {
+ return test_main(argc, argv);
+ }
+ }
+#endif
+
try
{
ZenServerOptions GlobalOptions;
@@ -809,8 +937,6 @@ main(int argc, char* argv[])
std::filesystem::create_directories(GlobalOptions.DataDir);
}
- InitializeLogging(GlobalOptions);
-
#if ZEN_PLATFORM_WINDOWS
if (GlobalOptions.InstallService)
{
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index b670582e7..d954d3f8d 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -120,7 +120,6 @@
<ClInclude Include="testing\httptest.h" />
<ClInclude Include="upstream\jupiter.h" />
<ClInclude Include="projectstore.h" />
- <ClInclude Include="cache\cacheagent.h" />
<ClInclude Include="testing\launch.h" />
<ClInclude Include="casstore.h" />
<ClInclude Include="diag\diagsvcs.h" />
@@ -142,7 +141,6 @@
<ClCompile Include="monitoring\httpstats.cpp" />
<ClCompile Include="monitoring\httpstatus.cpp" />
<ClCompile Include="projectstore.cpp" />
- <ClCompile Include="cache\cacheagent.cpp" />
<ClCompile Include="sos\sos.cpp" />
<ClCompile Include="testing\httptest.cpp" />
<ClCompile Include="upstream\jupiter.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index b87fa0016..04c6267ba 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -5,9 +5,6 @@
<ClInclude Include="projectstore.h" />
<ClInclude Include="casstore.h" />
<ClInclude Include="testing\launch.h" />
- <ClInclude Include="cache\cacheagent.h">
- <Filter>cache</Filter>
- </ClInclude>
<ClInclude Include="diag\diagsvcs.h">
<Filter>diag</Filter>
</ClInclude>
@@ -49,9 +46,6 @@
<ClCompile Include="zenserver.cpp" />
<ClCompile Include="projectstore.cpp" />
<ClCompile Include="casstore.cpp" />
- <ClCompile Include="cache\cacheagent.cpp">
- <Filter>cache</Filter>
- </ClCompile>
<ClCompile Include="experimental\usnjournal.cpp">
<Filter>experimental</Filter>
</ClCompile>
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index bbb9e1036..9ed70a5ec 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -2,6 +2,7 @@
#include "zenstore/basicfile.h"
+#include <zencore/compactbinary.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -266,6 +267,53 @@ TemporaryFile::MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::
std::filesystem::rename(m_TempPath, FinalFileName, Ec);
}
+//////////////////////////////////////////////////////////////////////////
+
+LockFile::LockFile()
+{
+}
+
+LockFile::~LockFile()
+{
+}
+
+void
+LockFile::Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec)
+{
+ Ec.clear();
+
+ const DWORD dwCreationDisposition = CREATE_ALWAYS;
+ DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE | DELETE;
+ const DWORD dwShareMode = FILE_SHARE_READ;
+ const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_DELETE_ON_CLOSE;
+ HANDLE hTemplateFile = nullptr;
+
+ HANDLE FileHandle = CreateFile(FileName.c_str(),
+ dwDesiredAccess,
+ dwShareMode,
+ /* lpSecurityAttributes */ nullptr,
+ dwCreationDisposition,
+ dwFlagsAndAttributes,
+ hTemplateFile);
+
+ if (FileHandle == INVALID_HANDLE_VALUE)
+ {
+ Ec = zen::MakeErrorCodeFromLastError();
+
+ return;
+ }
+
+ m_FileHandle = FileHandle;
+
+ BasicFile::Write(Payload.GetBuffer(), 0, Ec);
+}
+
+void
+LockFile::Update(CbObject Payload, std::error_code& Ec)
+{
+ BasicFile::Write(Payload.GetBuffer(), 0, Ec);
+}
+
/*
___________ __
\__ ___/___ _______/ |_ ______
diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h
index 7ae35dea6..e4414787c 100644
--- a/zenstore/include/zenstore/basicfile.h
+++ b/zenstore/include/zenstore/basicfile.h
@@ -12,6 +12,8 @@
namespace zen {
+class CbObject;
+
/**
* Probably the most basic file abstraction in the universe
*
@@ -80,6 +82,22 @@ private:
using BasicFile::Open;
};
+/** Lock file abstraction
+
+ */
+
+class LockFile : protected BasicFile
+{
+public:
+ LockFile();
+ ~LockFile();
+
+ void Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec);
+ void Update(CbObject Payload, std::error_code& Ec);
+
+private:
+};
+
ZENCORE_API void basicfile_forcelink();
} // namespace zen
diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h
index 09728aa1a..8a4f9604d 100644
--- a/zenutil/include/zenutil/zenserverprocess.h
+++ b/zenutil/include/zenutil/zenserverprocess.h
@@ -94,17 +94,22 @@ public:
struct ZenServerEntry
{
+ // NOTE: any changes to this should consider backwards compatibility
+ // which means you should not rearrange members only potentially
+ // add something to the end or use a different mechanism for
+ // additional state. For example, you can use the session ID
+ // to introduce additional named objects
std::atomic<uint32_t> Pid;
std::atomic<uint16_t> ListenPort;
std::atomic<uint16_t> Flags;
uint8_t SessionId[12];
- std::atomic<uint32_t> SponsorPids[32];
+ std::atomic<uint32_t> SponsorPids[8];
uint8_t Padding[12];
- uint8_t Padding2[96];
enum class FlagsEnum : uint16_t
{
- kShutdownPlease = 1 << 0
+ kShutdownPlease = 1 << 0,
+ kIsReady = 1 << 1,
};
FRIEND_ENUM_CLASS_FLAGS(FlagsEnum);
@@ -112,10 +117,11 @@ public:
Oid GetSessionId() const { return Oid::FromMemory(SessionId); }
void Reset();
void SignalShutdownRequest();
+ void SignalReady();
bool AddSponsorProcess(uint32_t Pid);
};
- static_assert(sizeof(ZenServerEntry) == 256);
+ static_assert(sizeof(ZenServerEntry) == 64);
void Initialize();
[[nodiscard]] bool InitializeReadOnly();
@@ -128,7 +134,7 @@ public:
private:
void* m_hMapFile = nullptr;
ZenServerEntry* m_Data = nullptr;
- int m_MaxEntryCount = 131072 / sizeof(ZenServerEntry);
+ int m_MaxEntryCount = 65536 / sizeof(ZenServerEntry);
ZenServerEntry* m_OurEntry = nullptr;
bool m_IsReadOnly = true;
};
diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp
index 55b592ab1..4098954a8 100644
--- a/zenutil/zenserverprocess.cpp
+++ b/zenutil/zenserverprocess.cpp
@@ -269,6 +269,12 @@ ZenServerState::ZenServerEntry::SignalShutdownRequest()
Flags |= uint16_t(FlagsEnum::kShutdownPlease);
}
+void
+ZenServerState::ZenServerEntry::SignalReady()
+{
+ Flags |= uint16_t(FlagsEnum::kIsReady);
+}
+
bool
ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd)
{