diff options
| author | Per Larsson <[email protected]> | 2021-11-01 14:38:00 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-11-01 14:38:00 +0100 |
| commit | 835b1a1f2ab4cd7b4044759f1fd151cb8ad0a31d (patch) | |
| tree | e914510031269ec33e364d7d553316ddb201b059 | |
| parent | First pass batch request. (diff) | |
| parent | Merge branch 'main' of https://github.com/EpicGames/zen (diff) | |
| download | zen-835b1a1f2ab4cd7b4044759f1fd151cb8ad0a31d.tar.xz zen-835b1a1f2ab4cd7b4044759f1fd151cb8ad0a31d.zip | |
Merge branch 'main' into zcache-batch
| -rw-r--r-- | zencore/filesystem.cpp | 2 | ||||
| -rw-r--r-- | zencore/include/zencore/sharedbuffer.h | 6 | ||||
| -rw-r--r-- | zencore/memory.cpp | 4 | ||||
| -rw-r--r-- | zencore/xmake.lua | 2 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 27 | ||||
| -rw-r--r-- | zenserver/cache/cacheagent.cpp | 5 | ||||
| -rw-r--r-- | zenserver/cache/cacheagent.h | 9 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 215 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 93 | ||||
| -rw-r--r-- | zenserver/config.cpp | 7 | ||||
| -rw-r--r-- | zenserver/config.h | 1 | ||||
| -rw-r--r-- | zenserver/diag/formatters.h | 20 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 10 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 195 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 154 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 6 | ||||
| -rw-r--r-- | zenstore/basicfile.cpp | 48 | ||||
| -rw-r--r-- | zenstore/include/zenstore/basicfile.h | 18 | ||||
| -rw-r--r-- | zenutil/include/zenutil/zenserverprocess.h | 16 | ||||
| -rw-r--r-- | zenutil/zenserverprocess.cpp | 6 |
21 files changed, 583 insertions, 263 deletions
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index b8e8eff55..a642e2cf6 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -840,7 +840,7 @@ TEST_CASE("filesystem") // GetExePath -- this is not a great test as it's so dependent on where the this code gets linked in path BinPath = GetRunningExecutablePath(); - const bool ExpectedExe = BinPath.stem() == "zencore-test" || BinPath.stem() == "zenserver-test" || BinPath.stem() == "zenstore-test"; + const bool ExpectedExe = ToUtf8(BinPath.stem().native()).ends_with("-test"sv) || BinPath.stem() == "zenserver"; CHECK(ExpectedExe); CHECK(is_regular_file(BinPath)); diff --git a/zencore/include/zencore/sharedbuffer.h b/zencore/include/zencore/sharedbuffer.h index 640c3fe74..1f87dc639 100644 --- a/zencore/include/zencore/sharedbuffer.h +++ b/zencore/include/zencore/sharedbuffer.h @@ -143,6 +143,12 @@ public: /** Make a non-owned view of the input */ [[nodiscard]] inline static SharedBuffer MakeView(MemoryView View) { return MakeView(View.GetData(), View.GetSize()); } + /** Make a non-owning view of the memory of the contiguous container. */ + [[nodiscard]] inline static SharedBuffer MakeView(const std::ranges::contiguous_range auto& Container) + { + std::span Span = Container; + return MakeView(Span.data(), Span.size() * sizeof(typename decltype(Span)::element_type)); + } /** Make a non-owned view of the input */ [[nodiscard]] ZENCORE_API static SharedBuffer MakeView(const void* Data, uint64_t Size); /** Make a non-owned view of the input */ diff --git a/zencore/memory.cpp b/zencore/memory.cpp index 62c81076d..14ea7ca1d 100644 --- a/zencore/memory.cpp +++ b/zencore/memory.cpp @@ -185,13 +185,13 @@ TEST_CASE("ChunkingLinearAllocator") TEST_CASE("MemoryView") { { - uint8_t Array1[16]; + uint8_t Array1[16] = {}; MemoryView View1 = MakeMemoryView(Array1); CHECK(View1.GetSize() == 16); } { - uint32_t Array2[16]; + uint32_t Array2[16] = {}; MemoryView View2 = MakeMemoryView(Array2); CHECK(View2.GetSize() == 64); } diff --git a/zencore/xmake.lua b/zencore/xmake.lua index 5de7f476d..0b0b51758 100644 --- a/zencore/xmake.lua +++ b/zencore/xmake.lua @@ -8,7 +8,9 @@ target('zencore') "vcpkg::spdlog", "vcpkg::fmt", "vcpkg::doctest", + "vcpkg::json11", "vcpkg::lz4", + "vcpkg::mimalloc", "vcpkg::cpr", "vcpkg::curl", -- required by cpr "vcpkg::zlib", -- required by curl diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 639f40689..876979914 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -66,6 +66,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #endif using namespace fmt::literals; +using namespace std::literals; /* @@ -126,10 +127,10 @@ public: m_Headers.reserve(16); zen::ExtendableStringBuilder<256> RequestBody; - RequestBody << "GET " << Path << " HTTP/1.1\r\n"; - RequestBody << "Host: " << Server << "\r\n"; - RequestBody << "Accept: */*\r\n"; - RequestBody << "Connection: " << (m_KeepAlive ? "keep-alive" : "close") << "\r\n\r\n"; // TODO: support keep-alive + RequestBody << "GET "sv << Path << " HTTP/1.1\r\n"sv; + RequestBody << "Host: "sv << Server << "\r\n"sv; + RequestBody << "Accept: */*\r\n"sv; + RequestBody << "Connection: "sv << (m_KeepAlive ? "keep-alive"sv : "close"sv) << "\r\n\r\n"sv; // TODO: support keep-alive m_RequestBody = RequestBody; @@ -460,12 +461,12 @@ using namespace spdlog; using namespace spdlog::details; using namespace std::literals; -class full_formatter final : public spdlog::formatter +class full_test_formatter final : public spdlog::formatter { public: - full_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {} + full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {} - virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_formatter>(m_LogId, m_Epoch); } + virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); } static constexpr bool UseDate = false; @@ -680,7 +681,7 @@ main(int argc, char** argv) zen::logging::InitializeLogging(); spdlog::set_level(spdlog::level::debug); - spdlog::set_formatter(std::make_unique<::logging::full_formatter>("test", std::chrono::system_clock::now())); + spdlog::set_formatter(std::make_unique<::logging::full_test_formatter>("test", std::chrono::system_clock::now())); std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path(); std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; @@ -1704,11 +1705,12 @@ TEST_CASE("zcache.policy") zen::IoHash Key; zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); // Store package locally { + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + CHECK(Package.GetAttachments().size() != 0); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, @@ -1761,11 +1763,12 @@ TEST_CASE("zcache.policy") zen::IoHash Key; zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); // Store package upstream { + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + CHECK(Package.GetAttachments().size() != 0); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, diff --git a/zenserver/cache/cacheagent.cpp b/zenserver/cache/cacheagent.cpp deleted file mode 100644 index f4d1cabe6..000000000 --- a/zenserver/cache/cacheagent.cpp +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "cacheagent.h" - -#include <gsl/gsl-lite.hpp> diff --git a/zenserver/cache/cacheagent.h b/zenserver/cache/cacheagent.h deleted file mode 100644 index 145d0f79f..000000000 --- a/zenserver/cache/cacheagent.h +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -class CacheAgent -{ -public: -private: -}; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index e7b840ed8..119062833 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -6,18 +6,26 @@ #include <zencore/windows.h> #include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/logging.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zencore/thread.h> -#include <zenstore/basicfile.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> +#include <zenstore/cidstore.h> +#include <zenstore/gc.h> #include <concepts> #include <filesystem> +#include <ranges> #include <unordered_map> ZEN_THIRD_PARTY_INCLUDES_START @@ -31,7 +39,7 @@ namespace zen { using namespace fmt::literals; -ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +ZenCacheStore::ZenCacheStore(const std::filesystem::path& RootDir) : m_DiskLayer{RootDir} { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); @@ -125,7 +133,8 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) void ZenCacheStore::GarbageCollect(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + m_DiskLayer.GarbageCollect(GcCtx); + m_MemLayer.GarbageCollect(GcCtx); } ////////////////////////////////////////////////////////////////////////// @@ -150,7 +159,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa return false; } - CacheBucket* Bucket = Bucket = &it->second; + CacheBucket* Bucket = &it->second; _.ReleaseNow(); @@ -213,7 +222,12 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) void ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.GarbageCollect(GcCtx); + } } void @@ -302,110 +316,50 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue ////////////////////////////////////////////////////////////////////////// -#pragma pack(push) -#pragma pack(1) +inline DiskLocation::DiskLocation() = default; -struct DiskLocation +inline DiskLocation::DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) +: OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) +, LowerSize(ValueSize & 0xFFFFffff) +, IndexDataSize(IndexSize) { - inline DiskLocation() = default; +} - inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) - : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) - , LowerSize(ValueSize & 0xFFFFffff) - , IndexDataSize(IndexSize) - { - } +inline uint64_t DiskLocation::CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) +{ + return Offset | Flags; +} - static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; - static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; - static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; - static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; - static const uint64_t kStructured = 0x4000'0000'0000'0000ull; - static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; +inline uint64_t DiskLocation::Offset() const +{ + return OffsetAndFlags & kOffsetMask; +} - static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } +inline uint64_t DiskLocation::Size() const +{ + return LowerSize; +} - inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } - inline uint64_t Size() const { return LowerSize; } - inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } - inline ZenContentType GetContentType() const - { - ZenContentType ContentType = ZenContentType::kBinary; +inline uint64_t DiskLocation::IsFlagSet(uint64_t Flag) const +{ + return OffsetAndFlags & Flag; +} - if (IsFlagSet(DiskLocation::kStructured)) - { - ContentType = ZenContentType::kCbObject; - } +inline ZenContentType DiskLocation::GetContentType() const +{ + ZenContentType ContentType = ZenContentType::kBinary; - return ContentType; + if (IsFlagSet(DiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; } -private: - uint64_t OffsetAndFlags = 0; - uint32_t LowerSize = 0; - uint32_t IndexDataSize = 0; -}; - -struct DiskIndexEntry -{ - IoHash Key; - DiskLocation Location; -}; - -#pragma pack(pop) + return ContentType; +} -static_assert(sizeof(DiskIndexEntry) == 36); +////////////////////////////////////////////////////////////////////////// -struct ZenCacheDiskLayer::CacheBucket -{ - CacheBucket(CasStore& Cas); - ~CacheBucket(); - - void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - static bool Delete(std::filesystem::path BucketDir); - - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - void Drop(); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); - - inline bool IsOk() const { return m_IsOk; } - -private: - CasStore& m_CasStore; - std::filesystem::path m_BucketDir; - Oid m_BucketId; - bool m_IsOk = false; - uint64_t m_LargeObjectThreshold = 64 * 1024; - - // These files are used to manage storage of small objects for this bucket - - BasicFile m_SobsFile; - TCasLogFile<DiskIndexEntry> m_SlogFile; - - RwLock m_IndexLock; - tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; - - void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); - bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); - - // These locks are here to avoid contention on file creation, therefore it's sufficient - // that we take the same lock for the same hash - // - // These locks are small and should really be spaced out so they don't share cache lines, - // but we don't currently access them at particularly high frequency so it should not be - // an issue in practice - - RwLock m_ShardedLocks[256]; - inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } -}; - -ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) +ZenCacheDiskLayer::CacheBucket::CacheBucket() { } @@ -839,7 +793,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) +ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) { } @@ -873,7 +827,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } else { - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + auto It = m_Buckets.try_emplace(std::string(InBucket)); Bucket = &It.first->second; std::filesystem::path BucketPath = m_RootDir; @@ -916,7 +870,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } else { - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + auto It = m_Buckets.try_emplace(std::string(InBucket)); Bucket = &It.first->second; std::filesystem::path bucketPath = m_RootDir; @@ -972,7 +926,7 @@ ZenCacheDiskLayer::DiscoverBuckets() } else { - auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore); + auto InsertResult = m_Buckets.try_emplace(BucketName8); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName8; @@ -1050,7 +1004,64 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) void ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.GarbageCollect(GcCtx); + } +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +TEST_CASE("z$.store") +{ + using namespace fmt::literals; + using namespace std::literals; + + ScopedTemporaryDirectory TempDir; + + ZenCacheStore Zcs(TempDir.Path() / "cache"); + + const int kIterationCount = 100; + + for (int i = 0; i < kIterationCount; ++i) + { + const IoHash Key = IoHash::HashBuffer(&i, sizeof i); + + CbObjectWriter Cbo; + Cbo << "hey" << i; + CbObject Obj = Cbo.Save(); + + ZenCacheValue Value; + Value.Value = Obj.GetBuffer().AsIoBuffer(); + Value.Value.SetContentType(ZenContentType::kCbObject); + + Zcs.Put("test_bucket"sv, Key, Value); + } + + for (int i = 0; i < kIterationCount; ++i) + { + const IoHash Key = IoHash::HashBuffer(&i, sizeof i); + + ZenCacheValue Value; + Zcs.Get("test_bucket"sv, Key, /* out */ Value); + + REQUIRE(Value.Value); + CHECK(Value.Value.GetContentType() == ZenContentType::kCbObject); + CHECK_EQ(ValidateCompactBinary(Value.Value, CbValidateMode::All), CbValidateError::None); + CbObject Obj = LoadCompactBinaryObject(Value.Value); + CHECK_EQ(Obj["hey"].AsInt32(), i); + } +} + +#endif + +void +z$_forcelink() +{ } } // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 4753af627..6521d8393 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -7,7 +7,9 @@ #include <zencore/iohash.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/basicfile.h> #include <zenstore/cas.h> +#include <zenstore/caslog.h> #pragma warning(push) #pragma warning(disable : 4127) @@ -98,10 +100,46 @@ private: Configuration m_Configuration; }; +#pragma pack(push) +#pragma pack(1) + +struct DiskLocation +{ + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; + + DiskLocation(); + DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags); + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags); + uint64_t Offset() const; + uint64_t Size() const; + uint64_t IsFlagSet(uint64_t Flag) const; + ZenContentType GetContentType() const; + +private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; +}; + +struct DiskIndexEntry +{ + IoHash Key; + DiskLocation Location; +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 36); + class ZenCacheDiskLayer { public: - ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir); + explicit ZenCacheDiskLayer(const std::filesystem::path& RootDir); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -117,9 +155,54 @@ private: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket; + struct CacheBucket + { + CacheBucket(); + ~CacheBucket(); + + void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + static bool Delete(std::filesystem::path BucketDir); + + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); + void Flush(); + void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); + + inline bool IsOk() const { return m_IsOk; } + + private: + std::filesystem::path m_BucketDir; + Oid m_BucketId; + bool m_IsOk = false; + uint64_t m_LargeObjectThreshold = 64 * 1024; + + // These files are used to manage storage of small objects for this bucket + + BasicFile m_SobsFile; + TCasLogFile<DiskIndexEntry> m_SlogFile; + + RwLock m_IndexLock; + tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; + + void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + + // These locks are here to avoid contention on file creation, therefore it's sufficient + // that we take the same lock for the same hash + // + // These locks are small and should really be spaced out so they don't share cache lines, + // but we don't currently access them at particularly high frequency so it should not be + // an issue in practice + + RwLock m_ShardedLocks[256]; + inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } + }; - CasStore& m_CasStore; std::filesystem::path m_RootDir; RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive @@ -128,7 +211,7 @@ private: class ZenCacheStore { public: - ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir); + explicit ZenCacheStore(const std::filesystem::path& RootDir); ~ZenCacheStore(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -146,4 +229,6 @@ private: uint64_t m_LastScrubTime = 0; }; +void z$_forcelink(); + } // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index cbdbebb03..f512f8015 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -234,6 +234,13 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_option("cache", "", + "upstream-zen-dns", + "DNS that resolves to one or more Zen server instance(s)", + cxxopts::value<std::vector<std::string>>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Dns)->default_value(""), + ""); + + options.add_option("cache", + "", "upstream-thread-count", "Number of threads used for upstream procsssing", cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), diff --git a/zenserver/config.h b/zenserver/config.h index 2a5a17fb9..72a4f31bb 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -42,6 +42,7 @@ struct ZenUpstreamJupiterConfig struct ZenUpstreamZenConfig { std::vector<std::string> Urls; + std::vector<std::string> Dns; }; enum class UpstreamCachePolicy : uint8_t diff --git a/zenserver/diag/formatters.h b/zenserver/diag/formatters.h index 42f928efe..759df58d3 100644 --- a/zenserver/diag/formatters.h +++ b/zenserver/diag/formatters.h @@ -2,6 +2,11 @@ #pragma once +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/iobuffer.h> +#include <zencore/string.h> + ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> #include <fmt/format.h> @@ -17,7 +22,7 @@ struct fmt::formatter<cpr::Response> { using namespace std::literals; - if (Response.status_code == 200) + if (Response.status_code == 200 || Response.status_code == 201) { return fmt::format_to(Ctx.out(), "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", @@ -32,18 +37,21 @@ struct fmt::formatter<cpr::Response> const auto It = Response.header.find("Content-Type"); const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv; - const bool IsBinary = ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || - ContentType == "application/octet-stream"; - - if (IsBinary) + if (ContentType == "application/x-ue-cb"sv) { + zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::CbObjectView Obj(Body.Data()); + zen::ExtendableStringBuilder<256> Sb; + std::string_view Json = Obj.ToJson(Sb).ToView(); + return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reason: '{}'", + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'", Response.url.str(), Response.status_code, Response.uploaded_bytes, Response.downloaded_bytes, Response.elapsed, + Json, Response.reason); } else diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 556a2124d..7a36b5841 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -87,12 +87,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -137,6 +137,7 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -172,6 +173,7 @@ CloudCacheSession::GetBlob(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -203,6 +205,7 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -234,6 +237,7 @@ CloudCacheSession::GetObject(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -383,6 +387,7 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); @@ -546,6 +551,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 168449d05..c0cd858b6 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -175,8 +175,10 @@ namespace detail { IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override { + using namespace fmt::literals; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const size_t MaxAttempts = 3; try { @@ -204,117 +206,112 @@ namespace detail { } else { - bool Success = false; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) - { - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool { + for (const IoHash& PayloadId : PayloadIds) { - if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - Result.Success) + const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + + if (It == std::end(CacheRecord.PayloadIds)) { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; - break; + OutReason = "payload '{}' MISSING from local cache"_format(PayloadId); + return false; } - } - if (!Success) - { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + + CloudCacheResult BlobResult; + for (size_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) + { + BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + if (!BlobResult.Success) + { + OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason); + return false; + } + + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; } + + return true; + }; + + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + if (!RefResult.Success) { - if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - Result.Success) - { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; + return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RefResult.Reason), + .Success = false}; + } - if (!Result.Needs.empty()) - { - for (const IoHash& NeededHash : Result.Needs) - { - Success = false; - - if (auto It = - std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); - It != std::end(CacheRecord.PayloadIds)) - { - const size_t Idx = It - std::begin(CacheRecord.PayloadIds); - - if (CloudCacheResult BlobResult = - Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - BlobResult.Success) - { - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - Success = true; - } - else - { - ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } - else - { - ZEN_WARN("needed payload '{}/{}/{}' MISSING", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - const IoHash RefHash = IoHash::HashBuffer(RecordValue); + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - if (FinalizeRefResult FinalizeResult = - Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); - FinalizeResult.Success) - { - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; - Success = true; - - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MissingHash); - } - } - else - { - ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - Success = false; - } - } + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } + + if (!FinalizeResult.Needs.empty()) + { + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } + + FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } - if (Success) + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) { - break; + Sb << MissingHash.ToHexString() << ","; } + + return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Sb.ToString()), + .Success = false}; } } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success}; + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; } } catch (std::exception& Err) @@ -890,6 +887,15 @@ private: { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } } @@ -907,7 +913,10 @@ private: } catch (std::exception& e) { - ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + ZEN_WARN("upload cache record '{}/{}' FAILED, reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + e.what()); } } diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index de30c5425..9a8090fc0 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -8,11 +8,13 @@ #include <zencore/logging.h> #include <zencore/refcount.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/timer.h> #include <zencore/windows.h> #include <zenhttp/httpserver.h> +#include <zenstore/basicfile.h> #include <zenstore/cas.h> #include <zenstore/cidstore.h> #include <zenutil/zenserverprocess.h> @@ -115,6 +117,37 @@ namespace zen { using namespace std::literals; using namespace fmt::literals; +namespace utils { + asio::error_code ResolveHostname(asio::io_context& Ctx, + std::string_view Host, + std::string_view DefaultPort, + std::vector<std::string>& OutEndpoints) + { + std::string_view Port = DefaultPort; + + if (const size_t Idx = Host.find(":"); Idx != std::string_view::npos) + { + Port = Host.substr(Idx + 1); + Host = Host.substr(0, Idx); + } + + asio::ip::tcp::resolver Resolver(Ctx); + + asio::error_code ErrorCode; + asio::ip::tcp::resolver::results_type Endpoints = Resolver.resolve(Host, Port, ErrorCode); + + if (!ErrorCode) + { + for (const asio::ip::tcp::endpoint& Ep : Endpoints) + { + OutEndpoints.push_back("http://{}:{}"_format(Ep.address().to_string(), Ep.port())); + } + } + + return ErrorCode; + } +} // namespace utils + class ZenServer : public IHttpStatusProvider { public: @@ -295,11 +328,13 @@ public: const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; - m_CurrentState = kRunning; + SetNewState(kRunning); + + OnReady(); m_Http->Run(IsInteractiveMode); - m_CurrentState = kShuttingDown; + SetNewState(kShuttingDown); ZEN_INFO(ZEN_APP_NAME " exiting"); @@ -321,6 +356,10 @@ public: void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } + std::function<void()> m_IsReadyFunc; + void SetIsReadyFunc(std::function<void()>&& IsReadyFunc) { m_IsReadyFunc = std::move(IsReadyFunc); } + void OnReady(); + void EnsureIoRunner() { if (!m_IoRunner.joinable()) @@ -434,6 +473,8 @@ private: kShuttingDown } m_CurrentState = kInitializing; + inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } + std::string_view ToString(ServerState Value) { switch (Value) @@ -475,6 +516,17 @@ private: }; void +ZenServer::OnReady() +{ + m_ServerEntry->SignalReady(); + + if (m_IsReadyFunc) + { + m_IsReadyFunc(); + } +} + +void ZenServer::InitializeState(ZenServiceConfig& ServiceConfig) { // Check root manifest to deal with schema versioning @@ -575,7 +627,7 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); + m_CacheStore = std::make_unique<ZenCacheStore>(m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) @@ -595,12 +647,29 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); - if (!UpstreamConfig.ZenConfig.Urls.empty()) + // Zen upstream { - std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls); - UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls; + if (!UpstreamConfig.ZenConfig.Dns.empty()) + { + for (const std::string& Dns : UpstreamConfig.ZenConfig.Dns) + { + const asio::error_code Err = zen::utils::ResolveHostname(m_IoContext, Dns, "1337"sv, ZenUrls); + if (Err) + { + ZEN_ERROR("resolve '{}' FAILED, reason '{}'", Err.message()); + } + } + } + + if (!ZenUrls.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(ZenUrls); + UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + } } + // Jupiter upstream { zen::CloudCacheClientOptions Options; if (UpstreamConfig.JupiterConfig.UseProductionSettings) @@ -681,6 +750,7 @@ public: private: ZenServerOptions& m_GlobalOptions; ZenServiceConfig& m_ServiceConfig; + zen::LockFile m_LockFile; }; int @@ -703,6 +773,32 @@ ZenWindowsService::Run() try { + // Mutual exclusion and synchronization + + std::error_code Ec; + + std::filesystem::path LockFilePath = GlobalOptions.DataDir / ".lock"; + + bool IsReady = false; + + auto MakeLockData = [&] { + CbObjectWriter Cbo; + Cbo << "pid" << _getpid() << "data" << ToUtf8(GlobalOptions.DataDir) << "port" << GlobalOptions.BasePort << "session_id" + << GetSessionId() << "ready" << IsReady; + return Cbo.Save(); + }; + + m_LockFile.Create(LockFilePath, MakeLockData(), Ec); + + if (Ec) + { + ConsoleLog().error("ERROR: Unable to grab lock at '{}' (error: '{}')", LockFilePath, Ec.message()); + + std::exit(99); + } + + InitializeLogging(GlobalOptions); + // Prototype config system, we'll see how this pans out // // TODO: we need to report any parse errors here @@ -764,13 +860,19 @@ ZenWindowsService::Run() }}); // If we have a parent process, establish the mechanisms we need - // to be able to communicate with the parent + // to be able to communicate readiness with the parent - if (!GlobalOptions.ChildId.empty()) - { - zen::NamedEvent ParentEvent{GlobalOptions.ChildId}; - ParentEvent.Set(); - } + Server.SetIsReadyFunc([&] { + IsReady = true; + + m_LockFile.Update(MakeLockData(), Ec); + + if (!GlobalOptions.ChildId.empty()) + { + zen::NamedEvent ParentEvent{GlobalOptions.ChildId}; + ParentEvent.Set(); + } + }); Server.Run(); Server.Cleanup(); @@ -788,6 +890,22 @@ ZenWindowsService::Run() return 0; } +#if ZEN_WITH_TESTS +int +test_main(int argc, char** argv) +{ + zen::zencore_forcelinktests(); + zen::zenhttp_forcelinktests(); + zen::zenstore_forcelinktests(); + + zen::logging::InitializeLogging(); + + spdlog::set_level(spdlog::level::debug); + + return doctest::Context(argc, argv).run(); +} +#endif + int main(int argc, char* argv[]) { @@ -797,6 +915,16 @@ main(int argc, char* argv[]) mi_version(); #endif +#if ZEN_WITH_TESTS + if (argc >= 2) + { + if (argv[1] == "test"sv) + { + return test_main(argc, argv); + } + } +#endif + try { ZenServerOptions GlobalOptions; @@ -809,8 +937,6 @@ main(int argc, char* argv[]) std::filesystem::create_directories(GlobalOptions.DataDir); } - InitializeLogging(GlobalOptions); - #if ZEN_PLATFORM_WINDOWS if (GlobalOptions.InstallService) { diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index b670582e7..d954d3f8d 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -120,7 +120,6 @@ <ClInclude Include="testing\httptest.h" /> <ClInclude Include="upstream\jupiter.h" /> <ClInclude Include="projectstore.h" /> - <ClInclude Include="cache\cacheagent.h" /> <ClInclude Include="testing\launch.h" /> <ClInclude Include="casstore.h" /> <ClInclude Include="diag\diagsvcs.h" /> @@ -142,7 +141,6 @@ <ClCompile Include="monitoring\httpstats.cpp" /> <ClCompile Include="monitoring\httpstatus.cpp" /> <ClCompile Include="projectstore.cpp" /> - <ClCompile Include="cache\cacheagent.cpp" /> <ClCompile Include="sos\sos.cpp" /> <ClCompile Include="testing\httptest.cpp" /> <ClCompile Include="upstream\jupiter.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index b87fa0016..04c6267ba 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -5,9 +5,6 @@ <ClInclude Include="projectstore.h" /> <ClInclude Include="casstore.h" /> <ClInclude Include="testing\launch.h" /> - <ClInclude Include="cache\cacheagent.h"> - <Filter>cache</Filter> - </ClInclude> <ClInclude Include="diag\diagsvcs.h"> <Filter>diag</Filter> </ClInclude> @@ -49,9 +46,6 @@ <ClCompile Include="zenserver.cpp" /> <ClCompile Include="projectstore.cpp" /> <ClCompile Include="casstore.cpp" /> - <ClCompile Include="cache\cacheagent.cpp"> - <Filter>cache</Filter> - </ClCompile> <ClCompile Include="experimental\usnjournal.cpp"> <Filter>experimental</Filter> </ClCompile> diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp index bbb9e1036..9ed70a5ec 100644 --- a/zenstore/basicfile.cpp +++ b/zenstore/basicfile.cpp @@ -2,6 +2,7 @@ #include "zenstore/basicfile.h" +#include <zencore/compactbinary.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -266,6 +267,53 @@ TemporaryFile::MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std:: std::filesystem::rename(m_TempPath, FinalFileName, Ec); } +////////////////////////////////////////////////////////////////////////// + +LockFile::LockFile() +{ +} + +LockFile::~LockFile() +{ +} + +void +LockFile::Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec) +{ + Ec.clear(); + + const DWORD dwCreationDisposition = CREATE_ALWAYS; + DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE | DELETE; + const DWORD dwShareMode = FILE_SHARE_READ; + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_DELETE_ON_CLOSE; + HANDLE hTemplateFile = nullptr; + + HANDLE FileHandle = CreateFile(FileName.c_str(), + dwDesiredAccess, + dwShareMode, + /* lpSecurityAttributes */ nullptr, + dwCreationDisposition, + dwFlagsAndAttributes, + hTemplateFile); + + if (FileHandle == INVALID_HANDLE_VALUE) + { + Ec = zen::MakeErrorCodeFromLastError(); + + return; + } + + m_FileHandle = FileHandle; + + BasicFile::Write(Payload.GetBuffer(), 0, Ec); +} + +void +LockFile::Update(CbObject Payload, std::error_code& Ec) +{ + BasicFile::Write(Payload.GetBuffer(), 0, Ec); +} + /* ___________ __ \__ ___/___ _______/ |_ ______ diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h index 7ae35dea6..e4414787c 100644 --- a/zenstore/include/zenstore/basicfile.h +++ b/zenstore/include/zenstore/basicfile.h @@ -12,6 +12,8 @@ namespace zen { +class CbObject; + /** * Probably the most basic file abstraction in the universe * @@ -80,6 +82,22 @@ private: using BasicFile::Open; }; +/** Lock file abstraction + + */ + +class LockFile : protected BasicFile +{ +public: + LockFile(); + ~LockFile(); + + void Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec); + void Update(CbObject Payload, std::error_code& Ec); + +private: +}; + ZENCORE_API void basicfile_forcelink(); } // namespace zen diff --git a/zenutil/include/zenutil/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h index 09728aa1a..8a4f9604d 100644 --- a/zenutil/include/zenutil/zenserverprocess.h +++ b/zenutil/include/zenutil/zenserverprocess.h @@ -94,17 +94,22 @@ public: struct ZenServerEntry { + // NOTE: any changes to this should consider backwards compatibility + // which means you should not rearrange members only potentially + // add something to the end or use a different mechanism for + // additional state. For example, you can use the session ID + // to introduce additional named objects std::atomic<uint32_t> Pid; std::atomic<uint16_t> ListenPort; std::atomic<uint16_t> Flags; uint8_t SessionId[12]; - std::atomic<uint32_t> SponsorPids[32]; + std::atomic<uint32_t> SponsorPids[8]; uint8_t Padding[12]; - uint8_t Padding2[96]; enum class FlagsEnum : uint16_t { - kShutdownPlease = 1 << 0 + kShutdownPlease = 1 << 0, + kIsReady = 1 << 1, }; FRIEND_ENUM_CLASS_FLAGS(FlagsEnum); @@ -112,10 +117,11 @@ public: Oid GetSessionId() const { return Oid::FromMemory(SessionId); } void Reset(); void SignalShutdownRequest(); + void SignalReady(); bool AddSponsorProcess(uint32_t Pid); }; - static_assert(sizeof(ZenServerEntry) == 256); + static_assert(sizeof(ZenServerEntry) == 64); void Initialize(); [[nodiscard]] bool InitializeReadOnly(); @@ -128,7 +134,7 @@ public: private: void* m_hMapFile = nullptr; ZenServerEntry* m_Data = nullptr; - int m_MaxEntryCount = 131072 / sizeof(ZenServerEntry); + int m_MaxEntryCount = 65536 / sizeof(ZenServerEntry); ZenServerEntry* m_OurEntry = nullptr; bool m_IsReadOnly = true; }; diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 55b592ab1..4098954a8 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -269,6 +269,12 @@ ZenServerState::ZenServerEntry::SignalShutdownRequest() Flags |= uint16_t(FlagsEnum::kShutdownPlease); } +void +ZenServerState::ZenServerEntry::SignalReady() +{ + Flags |= uint16_t(FlagsEnum::kIsReady); +} + bool ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd) { |