diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 15 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 70 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 37 | ||||
| -rw-r--r-- | zenserver/diag/logging.cpp | 4 | ||||
| -rw-r--r-- | zenserver/projectstore.h | 7 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 6 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 46 |
8 files changed, 114 insertions, 73 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 35cb02cbb..ffb6f563a 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -148,7 +148,6 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CasStore& InStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, @@ -157,7 +156,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCac , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) -, m_CasStore(InStore) , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { @@ -194,7 +192,6 @@ HttpStructuredCacheService::Scrub(ScrubContext& Ctx) m_LastScrubTime = Ctx.ScrubTimestamp(); - m_CasStore.Scrub(Ctx); m_CidStore.Scrub(Ctx); m_CacheStore.Scrub(Ctx); } @@ -716,12 +713,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - Payload = UpstreamResult.Value; - IoHash ChunkHash = IoHash::HashBuffer(Payload); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); + InUpstreamCache = true; } else { @@ -789,9 +782,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); } - CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index ad7253f79..68a01becb 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -54,7 +54,6 @@ class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider { public: HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CasStore& InCasStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, @@ -98,7 +97,6 @@ private: ZenCacheStore& m_CacheStore; HttpStatsService& m_StatsService; HttpStatusService& m_StatusService; - CasStore& m_CasStore; CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index e7b840ed8..f964c3102 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -6,6 +6,7 @@ #include <zencore/windows.h> #include <zencore/compactbinary.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> @@ -15,9 +16,11 @@ #include <zenstore/basicfile.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> +#include <zenstore/gc.h> #include <concepts> #include <filesystem> +#include <memory_resource> #include <unordered_map> ZEN_THIRD_PARTY_INCLUDES_START @@ -31,7 +34,7 @@ namespace zen { using namespace fmt::literals; -ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcContributor(Gc), m_DiskLayer{RootDir} { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); @@ -80,6 +83,25 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa m_DiskLayer.Put(InBucket, HashKey, Value); +#if ZEN_USE_REF_TRACKING + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) + { + CbObject Object{SharedBuffer(Value.Value)}; + + uint8_t TempBuffer[8 * sizeof(IoHash)]; + std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; + std::pmr::polymorphic_allocator Allocator{&Linear}; + std::pmr::vector<IoHash> CidReferences{Allocator}; + + Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); + + m_Gc.OnNewCidReferences(CidReferences); + } + } +#endif + if (Value.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, Value); @@ -123,9 +145,10 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) } void -ZenCacheStore::GarbageCollect(GcContext& GcCtx) +ZenCacheStore::GatherReferences(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + m_MemLayer.GatherReferences(GcCtx); + m_DiskLayer.GatherReferences(GcCtx); } ////////////////////////////////////////////////////////////////////////// @@ -211,7 +234,7 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) } void -ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) +ZenCacheMemoryLayer::GatherReferences(GcContext& GcCtx) { ZEN_UNUSED(GcCtx); } @@ -238,7 +261,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) } void -ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +ZenCacheMemoryLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { // Is it even meaningful to do this? The memory layer shouldn't // contain anything which is not already in the disk layer @@ -358,23 +381,21 @@ static_assert(sizeof(DiskIndexEntry) == 36); struct ZenCacheDiskLayer::CacheBucket { - CacheBucket(CasStore& Cas); + CacheBucket(); ~CacheBucket(); void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); static bool Delete(std::filesystem::path BucketDir); - - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - void Drop(); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); + void Flush(); + void Scrub(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); inline bool IsOk() const { return m_IsOk; } private: - CasStore& m_CasStore; std::filesystem::path m_BucketDir; Oid m_BucketId; bool m_IsOk = false; @@ -405,7 +426,7 @@ private: inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } }; -ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) +ZenCacheDiskLayer::CacheBucket::CacheBucket() { } @@ -702,7 +723,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } void -ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_IndexLock); @@ -839,7 +860,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) +ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) { } @@ -873,7 +894,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } else { - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + auto It = m_Buckets.try_emplace(std::string(InBucket)); Bucket = &It.first->second; std::filesystem::path BucketPath = m_RootDir; @@ -916,7 +937,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } else { - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + auto It = m_Buckets.try_emplace(std::string(InBucket)); Bucket = &It.first->second; std::filesystem::path bucketPath = m_RootDir; @@ -972,7 +993,7 @@ ZenCacheDiskLayer::DiscoverBuckets() } else { - auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore); + auto InsertResult = m_Buckets.try_emplace(BucketName8); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName8; @@ -1048,9 +1069,14 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) } void -ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx) +ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.GatherReferences(GcCtx); + } } } // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 4753af627..da3e74126 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -8,11 +8,11 @@ #include <zencore/thread.h> #include <zencore/uid.h> #include <zenstore/cas.h> +#include <zenstore/gc.h> -#pragma warning(push) -#pragma warning(disable : 4127) +ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> -#pragma warning(pop) +ZEN_THIRD_PARTY_INCLUDES_END #include <compare> #include <filesystem> @@ -22,6 +22,7 @@ namespace zen { class WideStringBuilderBase; class CasStore; +class CasGc; /****************************************************************************** @@ -50,6 +51,9 @@ struct ZenCacheValue Intended for small values which are frequently accessed + This should have a better memory management policy to maintain reasonable + footprint. + */ class ZenCacheMemoryLayer { @@ -61,7 +65,7 @@ public: void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + void GatherReferences(GcContext& GcCtx); struct Configuration { @@ -87,7 +91,7 @@ private: bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + void GatherReferences(GcContext& GcCtx); private: uint64_t GetCurrentTimeStamp(); @@ -101,7 +105,7 @@ private: class ZenCacheDiskLayer { public: - ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheDiskLayer(const std::filesystem::path& RootDir); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -109,7 +113,7 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + void GatherReferences(GcContext& GcCtx); void DiscoverBuckets(); @@ -119,30 +123,29 @@ private: */ struct CacheBucket; - CasStore& m_CasStore; std::filesystem::path m_RootDir; RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive }; -class ZenCacheStore +class ZenCacheStore : public GcContributor { public: - ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir); ~ZenCacheStore(); - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - bool DropBucket(std::string_view Bucket); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool DropBucket(std::string_view Bucket); + void Flush(); + void Scrub(ScrubContext& Ctx); + virtual void GatherReferences(GcContext& GcCtx) override; private: std::filesystem::path m_RootDir; ZenCacheMemoryLayer m_MemLayer; ZenCacheDiskLayer m_DiskLayer; - uint64_t m_DiskLayerSizeThreshold = 4 * 1024; + uint64_t m_DiskLayerSizeThreshold = 1 * 1024; uint64_t m_LastScrubTime = 0; }; diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index 6e2559f1f..74c5c2101 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -272,12 +272,12 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) auto HttpLogger = std::make_shared<spdlog::logger>("http_requests", HttpSink); spdlog::register_logger(HttpLogger); - // Jupiter - only log HTTP traffic to file + // Jupiter - only log upstream HTTP traffic to file auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink); spdlog::register_logger(JupiterLogger); - // Zen - only log HTTP traffic to file + // Zen - only log upstream HTTP traffic to file auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink); spdlog::register_logger(ZenClientLogger); diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index c9f49217a..83d3986bb 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/logging.h> #include <zencore/uid.h> #include <zencore/xxhash.h> #include <zenhttp/httpserver.h> @@ -9,13 +10,15 @@ #include <zenstore/caslog.h> #include <zenstore/cidstore.h> -#include <tsl/robin_map.h> -#include <zencore/logging.h> #include <filesystem> #include <map> #include <optional> #include <string> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { class CbPackage; diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 12e46bd8d..7f55294ce 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -10,12 +10,10 @@ #include <zencore/uid.h> #include <zencore/zencore.h> -#pragma warning(push) -#pragma warning(disable : 4127) +ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> -#pragma warning(pop) - #include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END #include <chrono> diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 00abed513..040d73581 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -19,6 +19,9 @@ #include <zenstore/cidstore.h> #include <zenutil/zenserverprocess.h> +#define ZEN_USE_NAMED_PIPES 0 +#define ZEN_USE_EXEC 0 + #if ZEN_USE_MIMALLOC ZEN_THIRD_PARTY_INCLUDES_START # include <mimalloc-new-delete.h> @@ -43,7 +46,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # define BUILD_VERSION ("dev-build") #endif -#define ZEN_SCHEMA_VERSION 1 +#define ZEN_SCHEMA_VERSION 2 /* latest change by: stefan boberg */ ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring @@ -186,13 +189,18 @@ public: m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects"); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); + +#if ZEN_USE_NAMED_PIPES m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); +#endif ZEN_INFO("instantiating compute services"); +#if ZEN_USE_EXEC std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; zen::CreateDirectories(SandboxDir); m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); +#endif std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; zen::CreateDirectories(ApplySandboxDir); @@ -234,10 +242,12 @@ public: m_Http->RegisterService(*m_StructuredCacheService); } +#if ZEN_USE_EXEC if (m_HttpLaunchService) { m_Http->RegisterService(*m_HttpLaunchService); } +#endif if (m_HttpFunctionService) { @@ -399,6 +409,16 @@ public: NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } + void CollectGarbage() + { + Stopwatch Timer; + ZEN_INFO("Garbage collection STARTING"); + + m_Gc.CollectGarbage(); + + ZEN_INFO("Garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + void Flush() { if (m_CasStore) @@ -462,17 +482,15 @@ private: zen::Ref<zen::HttpServer> m_Http; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; - std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; + zen::CasGc m_Gc; + std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_Gc)}; std::unique_ptr<zen::CidStore> m_CidStore; std::unique_ptr<zen::ZenCacheStore> m_CacheStore; - zen::CasGc m_Gc{*m_CasStore}; zen::CasScrubber m_Scrubber{*m_CasStore}; zen::HttpTestService m_TestService; zen::HttpTestingService m_TestingService; zen::HttpCasService m_CasService{*m_CasStore}; zen::RefPtr<zen::ProjectStore> m_ProjectStore; - zen::Ref<zen::LocalProjectService> m_LocalProjectService; - std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; zen::HttpAdminService m_AdminService; @@ -481,6 +499,14 @@ private: std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; std::unique_ptr<zen::HttpFrontendService> m_FrontendService; +#if ZEN_USE_EXEC + std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; +#endif + +#if ZEN_USE_NAMED_PIPES + zen::Ref<zen::LocalProjectService> m_LocalProjectService; +#endif + bool m_DebugOptionForcedCrash = false; }; @@ -596,7 +622,7 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); + m_CacheStore = std::make_unique<ZenCacheStore>(m_Gc, m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) @@ -675,12 +701,8 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) } } - m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore, - *m_CasStore, - *m_CidStore, - m_StatsService, - m_StatusService, - std::move(UpstreamCache))); + m_StructuredCacheService.reset( + new zen::HttpStructuredCacheService(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, std::move(UpstreamCache))); } } // namespace zen |