diff options
29 files changed, 775 insertions, 202 deletions
diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp index 3283a8b66..043832dd3 100644 --- a/zen/chunk/chunk.cpp +++ b/zen/chunk/chunk.cpp @@ -15,6 +15,7 @@ #include <zencore/thread.h> #include <zencore/timer.h> #include <zenstore/cas.h> +#include <zenstore/gc.h> #include "../internalfile.h" @@ -942,12 +943,14 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::unique_ptr<zen::CasStore> CasStore; + zen::CasGc Gc; + if (!m_RootDirectory.empty()) { zen::CasStoreConfiguration Config; Config.RootDirectory = m_RootDirectory; - CasStore.reset(zen::CreateCasStore()); + CasStore.reset(zen::CreateCasStore(Gc)); CasStore->Initialize(Config); } diff --git a/zen/cmds/print.cpp b/zen/cmds/print.cpp index aac6afd44..1a73443dc 100644 --- a/zen/cmds/print.cpp +++ b/zen/cmds/print.cpp @@ -3,6 +3,7 @@ #include "print.h" #include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/string.h> @@ -40,9 +41,25 @@ PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_Filename.empty()) throw std::runtime_error("No file specified"); - zen::FileContents Fc = zen::ReadFile(m_Filename); - IoBuffer Data = Fc.Flatten(); - zen::CbObject Object{SharedBuffer(Data)}; + zen::FileContents Fc = zen::ReadFile(m_Filename); + + if (Fc.ErrorCode) + { + zen::ConsoleLog().error("Failed to open file '{}': {}", m_Filename, Fc.ErrorCode.message()); + + return 1; + } + + IoBuffer Data = Fc.Flatten(); + + if (CbValidateError Result = ValidateCompactBinary(Data, CbValidateMode::All); Result != CbValidateError::None) + { + zen::ConsoleLog().error("Data in file '{}' does not appear to be compact binary (validation error {:#x})", m_Filename, Result); + + return 1; + } + + zen::CbObject Object{SharedBuffer(Data)}; zen::StringBuilder<1024> ObjStr; zen::CompactBinaryToJson(Object, ObjStr); diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h index 7167ab3b5..1873ce48e 100644 --- a/zencore/include/zencore/refcount.h +++ b/zencore/include/zencore/refcount.h @@ -117,8 +117,9 @@ public: inline ~Ref() { m_Ref && m_Ref->Release(); } template<typename DerivedType> - requires std::derived_from<DerivedType, T> - inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref) {} + requires std::derived_from<DerivedType, T> inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref) + { + } [[nodiscard]] inline bool IsNull() const { return m_Ref == nullptr; } inline explicit operator bool() const { return m_Ref != nullptr; } diff --git a/zencore/include/zencore/zencore.h b/zencore/include/zencore/zencore.h index d654770d0..6b9a0f658 100644 --- a/zencore/include/zencore/zencore.h +++ b/zencore/include/zencore/zencore.h @@ -62,11 +62,13 @@ #if ZEN_COMPILER_MSC # pragma warning(disable : 4324) // warning C4324: '<type>': structure was padded due to alignment specifier +# pragma warning(default : 4668) // warning C4668: 'symbol' is not defined as a preprocessor macro, replacing with '0' for 'directives' +# pragma warning(default : 4100) // warning C4100: 'identifier' : unreferenced formal parameter #endif #ifndef ZEN_THIRD_PARTY_INCLUDES_START # if ZEN_COMPILER_MSC -# define ZEN_THIRD_PARTY_INCLUDES_START __pragma(warning(push)) __pragma(warning(disable : 4668)) +# define ZEN_THIRD_PARTY_INCLUDES_START __pragma(warning(push)) __pragma(warning(disable : 4668 4127)) # else # define ZEN_THIRD_PARTY_INCLUDES_START # endif diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index 7077942bf..6cee3f60d 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -500,7 +500,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) Handle = DataFile.Detach(); #else - int Fd = open(FileName.native().c_str(), O_RDONLY); + int Fd = open(FileName.native().c_str(), O_RDONLY); if (Fd < 0) { return {}; diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 23be9f729..7b65cdc78 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -53,6 +53,8 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <asio.hpp> +#define ZEN_USE_EXEC 0 // Note: this should really be a global define to match the zenserver definition + ////////////////////////////////////////////////////////////////////////// #include "projectclient.h" @@ -1704,12 +1706,12 @@ TEST_CASE("zcache.policy") zen::IoHash Key; zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); + zen::CbPackage OriginalPackage = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(OriginalPackage); // Store package locally { - CHECK(Package.GetAttachments().size() != 0); + CHECK(OriginalPackage.GetAttachments().size() != 0); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); @@ -1761,12 +1763,12 @@ TEST_CASE("zcache.policy") zen::IoHash Key; zen::IoHash PayloadId; - zen::CbPackage Package = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(Package); + zen::CbPackage OriginalPackage = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(OriginalPackage); // Store package upstream { - CHECK(Package.GetAttachments().size() != 0); + CHECK(OriginalPackage.GetAttachments().size() != 0); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); @@ -1880,6 +1882,8 @@ TEST_CASE("zcache.policy") } } +# if ZEN_USE_EXEC + struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) @@ -2035,7 +2039,7 @@ TEST_CASE("exec.basic") RemoteRequest.Prep(); zen::CbObject Result = RemoteRequest.Exec(); - CHECK(Result["exitcode"].AsInt32(-1) == 0); + CHECK(Result["exitcode"sv].AsInt32(-1) == 0); } { @@ -2044,14 +2048,14 @@ TEST_CASE("exec.basic") RemoteRequest.Prep(); zen::CbObject Result = RemoteRequest.Exec(); - CHECK(Result["exitcode"].AsInt32(-1) == 1); + CHECK(Result["exitcode"sv].AsInt32(-1) == 1); } } TEST_CASE("mesh.basic") { // --mesh option only available with ZEN_ENABLE_MESH -# if ZEN_ENABLE_MESH +# if ZEN_ENABLE_MESH using namespace std::literals; const int kInstanceCount = 4; @@ -2076,9 +2080,11 @@ TEST_CASE("mesh.basic") Instance->WaitUntilReady(); } -# endif +# endif } +# endif + class ZenServerTestHelper { public: diff --git a/zenserver-test/zenserver-test.vcxproj b/zenserver-test/zenserver-test.vcxproj index a39fce7ec..d632e395e 100644 --- a/zenserver-test/zenserver-test.vcxproj +++ b/zenserver-test/zenserver-test.vcxproj @@ -67,7 +67,6 @@ </PropertyGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <SDLCheck>true</SDLCheck> <PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions> <ConformanceMode>true</ConformanceMode> @@ -79,7 +78,6 @@ </ItemDefinitionGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <FunctionLevelLinking>true</FunctionLevelLinking> <IntrinsicFunctions>true</IntrinsicFunctions> <SDLCheck>true</SDLCheck> diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 35cb02cbb..ffb6f563a 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -148,7 +148,6 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CasStore& InStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, @@ -157,7 +156,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCac , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) -, m_CasStore(InStore) , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { @@ -194,7 +192,6 @@ HttpStructuredCacheService::Scrub(ScrubContext& Ctx) m_LastScrubTime = Ctx.ScrubTimestamp(); - m_CasStore.Scrub(Ctx); m_CidStore.Scrub(Ctx); m_CacheStore.Scrub(Ctx); } @@ -716,12 +713,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - Payload = UpstreamResult.Value; - IoHash ChunkHash = IoHash::HashBuffer(Payload); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); + InUpstreamCache = true; } else { @@ -789,9 +782,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); } - CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index ad7253f79..68a01becb 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -54,7 +54,6 @@ class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider { public: HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CasStore& InCasStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, @@ -98,7 +97,6 @@ private: ZenCacheStore& m_CacheStore; HttpStatsService& m_StatsService; HttpStatusService& m_StatusService; - CasStore& m_CasStore; CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index e7b840ed8..f964c3102 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -6,6 +6,7 @@ #include <zencore/windows.h> #include <zencore/compactbinary.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> @@ -15,9 +16,11 @@ #include <zenstore/basicfile.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> +#include <zenstore/gc.h> #include <concepts> #include <filesystem> +#include <memory_resource> #include <unordered_map> ZEN_THIRD_PARTY_INCLUDES_START @@ -31,7 +34,7 @@ namespace zen { using namespace fmt::literals; -ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcContributor(Gc), m_DiskLayer{RootDir} { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); @@ -80,6 +83,25 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa m_DiskLayer.Put(InBucket, HashKey, Value); +#if ZEN_USE_REF_TRACKING + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None) + { + CbObject Object{SharedBuffer(Value.Value)}; + + uint8_t TempBuffer[8 * sizeof(IoHash)]; + std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer}; + std::pmr::polymorphic_allocator Allocator{&Linear}; + std::pmr::vector<IoHash> CidReferences{Allocator}; + + Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); }); + + m_Gc.OnNewCidReferences(CidReferences); + } + } +#endif + if (Value.Value.Size() <= m_DiskLayerSizeThreshold) { m_MemLayer.Put(InBucket, HashKey, Value); @@ -123,9 +145,10 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) } void -ZenCacheStore::GarbageCollect(GcContext& GcCtx) +ZenCacheStore::GatherReferences(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + m_MemLayer.GatherReferences(GcCtx); + m_DiskLayer.GatherReferences(GcCtx); } ////////////////////////////////////////////////////////////////////////// @@ -211,7 +234,7 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) } void -ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) +ZenCacheMemoryLayer::GatherReferences(GcContext& GcCtx) { ZEN_UNUSED(GcCtx); } @@ -238,7 +261,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) } void -ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +ZenCacheMemoryLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { // Is it even meaningful to do this? The memory layer shouldn't // contain anything which is not already in the disk layer @@ -358,23 +381,21 @@ static_assert(sizeof(DiskIndexEntry) == 36); struct ZenCacheDiskLayer::CacheBucket { - CacheBucket(CasStore& Cas); + CacheBucket(); ~CacheBucket(); void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); static bool Delete(std::filesystem::path BucketDir); - - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - void Drop(); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); + void Flush(); + void Scrub(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); inline bool IsOk() const { return m_IsOk; } private: - CasStore& m_CasStore; std::filesystem::path m_BucketDir; Oid m_BucketId; bool m_IsOk = false; @@ -405,7 +426,7 @@ private: inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } }; -ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) +ZenCacheDiskLayer::CacheBucket::CacheBucket() { } @@ -702,7 +723,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } void -ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { RwLock::SharedLockScope _(m_IndexLock); @@ -839,7 +860,7 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir) : m_RootDir(RootDir), m_CasStore(Cas) +ZenCacheDiskLayer::ZenCacheDiskLayer(const std::filesystem::path& RootDir) : m_RootDir(RootDir) { } @@ -873,7 +894,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach } else { - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + auto It = m_Buckets.try_emplace(std::string(InBucket)); Bucket = &It.first->second; std::filesystem::path BucketPath = m_RootDir; @@ -916,7 +937,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } else { - auto It = m_Buckets.try_emplace(std::string(InBucket), m_CasStore); + auto It = m_Buckets.try_emplace(std::string(InBucket)); Bucket = &It.first->second; std::filesystem::path bucketPath = m_RootDir; @@ -972,7 +993,7 @@ ZenCacheDiskLayer::DiscoverBuckets() } else { - auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore); + auto InsertResult = m_Buckets.try_emplace(BucketName8); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName8; @@ -1048,9 +1069,14 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) } void -ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx) +ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_Buckets) + { + Kv.second.GatherReferences(GcCtx); + } } } // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 4753af627..da3e74126 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -8,11 +8,11 @@ #include <zencore/thread.h> #include <zencore/uid.h> #include <zenstore/cas.h> +#include <zenstore/gc.h> -#pragma warning(push) -#pragma warning(disable : 4127) +ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> -#pragma warning(pop) +ZEN_THIRD_PARTY_INCLUDES_END #include <compare> #include <filesystem> @@ -22,6 +22,7 @@ namespace zen { class WideStringBuilderBase; class CasStore; +class CasGc; /****************************************************************************** @@ -50,6 +51,9 @@ struct ZenCacheValue Intended for small values which are frequently accessed + This should have a better memory management policy to maintain reasonable + footprint. + */ class ZenCacheMemoryLayer { @@ -61,7 +65,7 @@ public: void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + void GatherReferences(GcContext& GcCtx); struct Configuration { @@ -87,7 +91,7 @@ private: bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + void GatherReferences(GcContext& GcCtx); private: uint64_t GetCurrentTimeStamp(); @@ -101,7 +105,7 @@ private: class ZenCacheDiskLayer { public: - ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheDiskLayer(const std::filesystem::path& RootDir); ~ZenCacheDiskLayer(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -109,7 +113,7 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + void GatherReferences(GcContext& GcCtx); void DiscoverBuckets(); @@ -119,30 +123,29 @@ private: */ struct CacheBucket; - CasStore& m_CasStore; std::filesystem::path m_RootDir; RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive }; -class ZenCacheStore +class ZenCacheStore : public GcContributor { public: - ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir); ~ZenCacheStore(); - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); - bool DropBucket(std::string_view Bucket); - void Flush(); - void Scrub(ScrubContext& Ctx); - void GarbageCollect(GcContext& GcCtx); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool DropBucket(std::string_view Bucket); + void Flush(); + void Scrub(ScrubContext& Ctx); + virtual void GatherReferences(GcContext& GcCtx) override; private: std::filesystem::path m_RootDir; ZenCacheMemoryLayer m_MemLayer; ZenCacheDiskLayer m_DiskLayer; - uint64_t m_DiskLayerSizeThreshold = 4 * 1024; + uint64_t m_DiskLayerSizeThreshold = 1 * 1024; uint64_t m_LastScrubTime = 0; }; diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index 6e2559f1f..74c5c2101 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -272,12 +272,12 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) auto HttpLogger = std::make_shared<spdlog::logger>("http_requests", HttpSink); spdlog::register_logger(HttpLogger); - // Jupiter - only log HTTP traffic to file + // Jupiter - only log upstream HTTP traffic to file auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink); spdlog::register_logger(JupiterLogger); - // Zen - only log HTTP traffic to file + // Zen - only log upstream HTTP traffic to file auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink); spdlog::register_logger(ZenClientLogger); diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index c9f49217a..83d3986bb 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/logging.h> #include <zencore/uid.h> #include <zencore/xxhash.h> #include <zenhttp/httpserver.h> @@ -9,13 +10,15 @@ #include <zenstore/caslog.h> #include <zenstore/cidstore.h> -#include <tsl/robin_map.h> -#include <zencore/logging.h> #include <filesystem> #include <map> #include <optional> #include <string> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { class CbPackage; diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 12e46bd8d..7f55294ce 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -10,12 +10,10 @@ #include <zencore/uid.h> #include <zencore/zencore.h> -#pragma warning(push) -#pragma warning(disable : 4127) +ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> -#pragma warning(pop) - #include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END #include <chrono> diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 00abed513..040d73581 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -19,6 +19,9 @@ #include <zenstore/cidstore.h> #include <zenutil/zenserverprocess.h> +#define ZEN_USE_NAMED_PIPES 0 +#define ZEN_USE_EXEC 0 + #if ZEN_USE_MIMALLOC ZEN_THIRD_PARTY_INCLUDES_START # include <mimalloc-new-delete.h> @@ -43,7 +46,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # define BUILD_VERSION ("dev-build") #endif -#define ZEN_SCHEMA_VERSION 1 +#define ZEN_SCHEMA_VERSION 2 /* latest change by: stefan boberg */ ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring @@ -186,13 +189,18 @@ public: m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects"); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); + +#if ZEN_USE_NAMED_PIPES m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); +#endif ZEN_INFO("instantiating compute services"); +#if ZEN_USE_EXEC std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; zen::CreateDirectories(SandboxDir); m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); +#endif std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; zen::CreateDirectories(ApplySandboxDir); @@ -234,10 +242,12 @@ public: m_Http->RegisterService(*m_StructuredCacheService); } +#if ZEN_USE_EXEC if (m_HttpLaunchService) { m_Http->RegisterService(*m_HttpLaunchService); } +#endif if (m_HttpFunctionService) { @@ -399,6 +409,16 @@ public: NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } + void CollectGarbage() + { + Stopwatch Timer; + ZEN_INFO("Garbage collection STARTING"); + + m_Gc.CollectGarbage(); + + ZEN_INFO("Garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + void Flush() { if (m_CasStore) @@ -462,17 +482,15 @@ private: zen::Ref<zen::HttpServer> m_Http; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; - std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; + zen::CasGc m_Gc; + std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_Gc)}; std::unique_ptr<zen::CidStore> m_CidStore; std::unique_ptr<zen::ZenCacheStore> m_CacheStore; - zen::CasGc m_Gc{*m_CasStore}; zen::CasScrubber m_Scrubber{*m_CasStore}; zen::HttpTestService m_TestService; zen::HttpTestingService m_TestingService; zen::HttpCasService m_CasService{*m_CasStore}; zen::RefPtr<zen::ProjectStore> m_ProjectStore; - zen::Ref<zen::LocalProjectService> m_LocalProjectService; - std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; zen::HttpAdminService m_AdminService; @@ -481,6 +499,14 @@ private: std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; std::unique_ptr<zen::HttpFrontendService> m_FrontendService; +#if ZEN_USE_EXEC + std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; +#endif + +#if ZEN_USE_NAMED_PIPES + zen::Ref<zen::LocalProjectService> m_LocalProjectService; +#endif + bool m_DebugOptionForcedCrash = false; }; @@ -596,7 +622,7 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); + m_CacheStore = std::make_unique<ZenCacheStore>(m_Gc, m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) @@ -675,12 +701,8 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) } } - m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore, - *m_CasStore, - *m_CidStore, - m_StatsService, - m_StatusService, - std::move(UpstreamCache))); + m_StructuredCacheService.reset( + new zen::HttpStructuredCacheService(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, std::move(UpstreamCache))); } } // namespace zen diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index a4bbfa340..86c6eb849 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -5,6 +5,9 @@ #include "compactcas.h" #include "filecas.h" +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/except.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -14,6 +17,7 @@ #include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/gc.h> #include <gsl/gsl-lite.hpp> @@ -67,34 +71,6 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba ////////////////////////////////////////////////////////////////////////// -struct GcContext::GcState -{ - CasChunkSet m_CasChunks; - CasChunkSet m_CidChunks; -}; - -GcContext::GcContext() : m_State(std::make_unique<GcState>()) -{ -} - -GcContext::~GcContext() -{ -} - -void -GcContext::ContributeCids(std::span<const IoHash> Cids) -{ - m_State->m_CidChunks.AddChunksToSet(Cids); -} - -void -GcContext::ContributeCas(std::span<const IoHash> Cas) -{ - m_State->m_CasChunks.AddChunksToSet(Cas); -} - -////////////////////////////////////////////////////////////////////////// - void ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) { @@ -119,7 +95,7 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) class CasImpl : public CasStore { public: - CasImpl(); + CasImpl(CasGc& Gc); virtual ~CasImpl(); virtual void Initialize(const CasStoreConfiguration& InConfig) override; @@ -128,14 +104,27 @@ public: virtual void FilterChunks(CasChunkSet& InOutChunks) override; virtual void Flush() override; virtual void Scrub(ScrubContext& Ctx) override; + virtual void GarbageCollect(GcContext& GcCtx) override; private: CasContainerStrategy m_TinyStrategy; CasContainerStrategy m_SmallStrategy; FileCasStrategy m_LargeStrategy; + CbObject m_ManifestObject; + + enum class StorageScheme + { + Legacy = 0, + WithCbManifest = 1 + }; + + StorageScheme m_StorageScheme = StorageScheme::Legacy; + + bool OpenOrCreateManifest(); + void UpdateManifest(); }; -CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config) +CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config, Gc) { } @@ -155,39 +144,100 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) std::filesystem::create_directories(m_Config.RootDirectory); // Open or create manifest - // - // The manifest is not currently fully implemented. The goal is to - // use it for recovery and configuration + const bool IsNewStore = OpenOrCreateManifest(); + + // Initialize payload storage + + m_LargeStrategy.Initialize(IsNewStore); + m_TinyStrategy.Initialize("tobs", 16, IsNewStore); + m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); +} + +bool +CasImpl::OpenOrCreateManifest() +{ bool IsNewStore = false; - { - std::filesystem::path ManifestPath = m_Config.RootDirectory; - ManifestPath /= ".ucas_root"; + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; - std::error_code Ec; - BasicFile Marker; - Marker.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); + std::error_code Ec; + BasicFile ManifestFile; + ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); - if (Ec) + bool ManifestIsOk = false; + + if (Ec) + { + if (Ec == std::errc::no_such_file_or_directory) { IsNewStore = true; + } + } + else + { + IoBuffer ManifestBuffer = ManifestFile.ReadAll(); + ManifestFile.Close(); - ExtendableStringBuilder<128> manifest; - manifest.Append("#CAS_ROOT\n"); - manifest.Append("ID="); - zen::Oid id = zen::Oid::NewOid(); - id.ToString(manifest); - - Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); - Marker.Write(manifest.c_str(), (DWORD)manifest.Size(), 0); + if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#') + { + // Old-style manifest, does not contain any useful information, so we may as well update it + } + else + { + CbObject Manifest{SharedBuffer(ManifestBuffer)}; + CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All); + + if (ValidationResult == CbValidateError::None) + { + if (Manifest["id"]) + { + ManifestIsOk = true; + } + } + else + { + ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", ValidationResult); + } + + if (ManifestIsOk) + { + m_ManifestObject = std::move(Manifest); + } } } - // Initialize payload storage + if (!ManifestIsOk) + { + UpdateManifest(); + } - m_TinyStrategy.Initialize("tobs", 16, IsNewStore); - m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); + return IsNewStore; +} + +void +CasImpl::UpdateManifest() +{ + if (!m_ManifestObject) + { + CbObjectWriter Cbo; + Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now(); + m_ManifestObject = Cbo.Save(); + } + + // Write manifest to file + + std::filesystem::path ManifestPath = m_Config.RootDirectory; + ManifestPath /= ".ucas_root"; + + // This will throw on failure + + ZEN_TRACE("Writing new manifest to '{}'", ManifestPath); + + BasicFile Marker; + Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); + Marker.Write(m_ManifestObject.GetBuffer(), 0); } CasStore::InsertResult @@ -262,12 +312,18 @@ CasImpl::Scrub(ScrubContext& Ctx) m_LargeStrategy.Scrub(Ctx); } +void +CasImpl::GarbageCollect(GcContext& GcCtx) +{ + m_LargeStrategy.CollectGarbage(GcCtx); +} + ////////////////////////////////////////////////////////////////////////// CasStore* -CreateCasStore() +CreateCasStore(CasGc& Gc) { - return new CasImpl(); + return new CasImpl(Gc); } ////////////////////////////////////////////////////////////////////////// @@ -284,7 +340,9 @@ TEST_CASE("CasStore") CasStoreConfiguration config; config.RootDirectory = TempDir.Path(); - std::unique_ptr<CasStore> Store{CreateCasStore()}; + CasGc Gc; + + std::unique_ptr<CasStore> Store{CreateCasStore(Gc)}; Store->Initialize(config); ScrubContext Ctx; diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index 2bac6affd..38d0f818e 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -46,7 +46,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat m_RecordSize = RecordSize; std::error_code Ec; - m_File.Open(FileName, IsCreate); + m_File.Open(FileName, IsCreate, Ec); if (Ec) { @@ -55,7 +55,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat uint64_t AppendOffset = 0; - if (IsCreate) + if (IsCreate || (m_File.FileSize() < sizeof(FileHeader))) { // Initialize log by writing header FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0}; @@ -76,12 +76,18 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) { - // TODO: provide more context! - throw std::runtime_error("Mangled log header"); + throw std::runtime_error("Mangled log header (invalid header magic) in '{}'"_format(FileName)); } AppendOffset = m_File.FileSize(); - m_Header = Header; + + // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended + + AppendOffset -= sizeof Header; + AppendOffset -= AppendOffset % RecordSize; + AppendOffset += sizeof Header; + + m_Header = Header; } m_AppendOffset = AppendOffset; @@ -125,6 +131,8 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler) { Handler(ReadBuffer.data() + (i * m_RecordSize)); } + + m_AppendOffset = LogBaseOffset + (LogFileSize * LogEntryCount); } void diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 612f87c7c..dbe5572b9 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -4,13 +4,19 @@ #include "CompactCas.h" +#include <zencore/compactbinarybuilder.h> #include <zencore/except.h> +#include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/gc.h> + #include <filesystem> #include <functional> #include <gsl/gsl-lite.hpp> @@ -58,7 +64,7 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { m_LocationMap[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size); + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); }); } @@ -91,7 +97,7 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const RwLock::ExclusiveLockScope __(m_LocationMapLock); - CasDiskLocation Location{.Offset = InsertOffset, .Size = /* TODO FIX */ uint32_t(ChunkSize)}; + const CasDiskLocation Location{InsertOffset, ChunkSize}; m_LocationMap[ChunkHash] = Location; @@ -116,7 +122,8 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash) if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; - return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size); + + return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); } // Not found @@ -187,11 +194,11 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (auto& Entry : m_LocationMap) { - const uint64_t EntryOffset = Entry.second.Offset; + const uint64_t EntryOffset = Entry.second.GetOffset(); if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { - const uint64_t EntryEnd = EntryOffset + Entry.second.Size; + const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); if (EntryEnd >= WindowEnd) { @@ -201,7 +208,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size); + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, + Entry.second.GetSize()); if (Entry.first != ComputedHash) { @@ -222,7 +230,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) for (const CasDiskIndexEntry& Entry : BigChunks) { IoHashStream Hasher; - m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) { + m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); @@ -258,6 +266,12 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } void +CasContainerStrategy::CollectGarbage(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + +void CasContainerStrategy::MakeSnapshot() { RwLock::SharedLockScope _(m_LocationMapLock); @@ -275,4 +289,81 @@ CasContainerStrategy::MakeSnapshot() m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); } +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +TEST_CASE("cas.compact.gc") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const int kIterationCount = 1000; + + std::vector<IoHash> Keys(kIterationCount); + + { + CasContainerStrategy Cas(CasConfig); + Cas.Initialize("test", 16, true); + + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + const IoHash Hash = HashBuffer(ObjBuffer); + + Cas.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + } + + // Validate that we can still read the inserted data after closing + // the original cas store + + { + CasContainerStrategy Cas(CasConfig); + Cas.Initialize("test", 16, false); + + for (int i = 0; i < kIterationCount; ++i) + { + IoBuffer Chunk = Cas.FindChunk(Keys[i]); + + CHECK(!!Chunk); + + CbObject Value = LoadCompactBinaryObject(Chunk); + + CHECK_EQ(Value["id"].AsInt32(), i); + } + + GcContext Ctx; + Cas.CollectGarbage(Ctx); + } +} + +#endif + +void +compactcas_forcelink() +{ +} + } // namespace zen diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index a512c3d93..a3f3121e6 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -23,17 +23,42 @@ namespace zen { struct CasDiskLocation { - uint64_t Offset; - // If we wanted to be able to store larger chunks using this storage mechanism then - // we could make this more like the IoStore index so we can store larger chunks. - // I.e use five bytes for size and seven for offset - uint32_t Size; + CasDiskLocation(uint64_t InOffset, uint64_t InSize) + { + ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); + ZEN_ASSERT(InSize <= 0xff'ffff'ffff); + + memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); + memcpy(&m_Size[0], &InSize, sizeof m_Size); + } + + CasDiskLocation() = default; + + inline uint64_t GetOffset() const + { + uint64_t Offset = 0; + memcpy(&Offset, &m_Offset, sizeof m_Offset); + return Offset; + } + + inline uint64_t GetSize() const + { + uint64_t Size = 0; + memcpy(&Size, &m_Size, sizeof m_Size); + return Size; + } + +private: + uint8_t m_Offset[5]; + uint8_t m_Size[5]; }; struct CasDiskIndexEntry { IoHash Key; CasDiskLocation Location; + ZenContentType ContentType = ZenContentType::kUnknownContentType; + uint8_t Flags = 0; }; #pragma pack(pop) @@ -61,6 +86,7 @@ struct CasContainerStrategy void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); void Flush(); void Scrub(ScrubContext& Ctx); + void CollectGarbage(GcContext& GcCtx); private: const CasStoreConfiguration& m_Config; @@ -80,4 +106,6 @@ private: void MakeSnapshot(); }; +void compactcas_forcelink(); + } // namespace zen diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index a37450cd8..0714637c6 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -14,6 +14,11 @@ #include <zencore/thread.h> #include <zencore/uid.h> #include <zenstore/basicfile.h> +#include <zenstore/gc.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +#endif #include <gsl/gsl-lite.hpp> @@ -65,7 +70,10 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas")) +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) +: GcStorage(Gc) +, m_Config(Config) +, m_Log(logging::Get("filecas")) { } @@ -73,9 +81,23 @@ FileCasStrategy::~FileCasStrategy() { } +void +FileCasStrategy::Initialize(bool IsNewStore) +{ + m_IsInitialized = true; + + CreateDirectories(m_Config.RootDirectory); + + m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore); + + m_CasLog.Replay([&](const FileCasIndexEntry& Entry) { ZEN_UNUSED(Entry); }); +} + CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying @@ -207,6 +229,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) if (Success) { + m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + return CasStore::InsertResult{.New = true}; } @@ -232,6 +256,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) CasStore::InsertResult FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); // See if file already exists @@ -304,12 +330,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // *after* the lock is released due to the initialization order PayloadFile.Close(); + m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); + return {.New = true}; } IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -320,6 +350,8 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { + ZEN_ASSERT(m_IsInitialized); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -332,6 +364,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) return false; } + void FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { @@ -340,11 +373,18 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath)); std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + + if (!Ec) + { + m_CasLog.Append({.Key = ChunkHash, .Size = ~(0ull)}); + } } void FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) { + ZEN_ASSERT(m_IsInitialized); + // NOTE: it's not a problem now, but in the future if a GC should happen while this // is in flight, the result could be wrong since chunks could go away in the meantime. // @@ -359,6 +399,8 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) void FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback) { + ZEN_ASSERT(m_IsInitialized); + struct Visitor : public FileSystemTraversal::TreeVisitor { Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {} @@ -430,6 +472,8 @@ FileCasStrategy::Flush() void FileCasStrategy::Scrub(ScrubContext& Ctx) { + ZEN_ASSERT(m_IsInitialized); + std::vector<IoHash> BadHashes; std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; @@ -476,9 +520,58 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } void -FileCasStrategy::GarbageCollect(GcContext& GcCtx) +FileCasStrategy::CollectGarbage(GcContext& GcCtx) { - ZEN_UNUSED(GcCtx); + ZEN_ASSERT(m_IsInitialized); + + ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory); + + std::vector<IoHash> ChunksToDelete; + std::atomic<uint64_t> ChunksToDeleteBytes{0}; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + + std::vector<IoHash> CandidateCas; + + IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + bool KeepThis = false; + CandidateCas.clear(); + CandidateCas.push_back(Hash); + GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) { + ZEN_UNUSED(Hash); + KeepThis = true; + }); + + const uint64_t FileSize = Payload.FileSize(); + + if (!KeepThis) + { + ChunksToDelete.push_back(Hash); + ChunksToDeleteBytes.fetch_add(FileSize); + } + + ++ChunkCount; + ChunkBytes.fetch_add(FileSize); + }); + + ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); + + if (ChunksToDelete.empty()) + { + return; + } + + ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunksToDeleteBytes)); + + for (const IoHash& Hash : ChunksToDelete) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message()); + } + } } ////////////////////////////////////////////////////////////////////////// @@ -489,12 +582,16 @@ TEST_CASE("cas.file.move") { using namespace fmt::literals; - ScopedTemporaryDirectory TempDir{"d:\\filecas_testdir"}; + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + CasGc Gc; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path() / "cas"; - FileCasStrategy FileCas(CasConfig); + FileCasStrategy FileCas(CasConfig, Gc); + FileCas.Initialize(/* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; @@ -564,6 +661,34 @@ TEST_CASE("cas.file.move") # endif } +TEST_CASE("cas.file.gc") +{ + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path() / "cas"; + + CasGc Gc; + FileCasStrategy FileCas(CasConfig, Gc); + FileCas.Initialize(/* IsNewStore */ true); + + for (int i = 0; i < 1000; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + IoHash Hash = HashBuffer(ObjBuffer); + + FileCas.InsertChunk(ObjBuffer, Hash); + } + + GcContext Ctx; + FileCas.CollectGarbage(Ctx); +} + #endif void diff --git a/zenstore/filecas.h b/zenstore/filecas.h index 14314ce52..ec2ca3f31 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -9,6 +9,8 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zenstore/cas.h> +#include <zenstore/caslog.h> +#include <zenstore/gc.h> #include <functional> @@ -23,18 +25,19 @@ class BasicFile; /** CAS storage strategy using a file-per-chunk storage strategy */ -struct FileCasStrategy +struct FileCasStrategy : public GcStorage { - FileCasStrategy(const CasStoreConfiguration& Config); + FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc); ~FileCasStrategy(); + void Initialize(bool IsNewStore); CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(CasChunkSet& InOutChunks); void Flush(); - void GarbageCollect(GcContext& GcCtx); + virtual void CollectGarbage(GcContext& GcCtx) override; void Scrub(ScrubContext& Ctx); private: @@ -43,6 +46,18 @@ private: RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines spdlog::logger& m_Log; spdlog::logger& Log() { return m_Log; } + bool m_IsInitialized = false; + + struct FileCasIndexEntry + { + IoHash Key; + uint32_t Pad = 0; + uint64_t Size = 0; + }; + + static_assert(sizeof(FileCasIndexEntry) == 32); + + TCasLogFile<FileCasIndexEntry> m_CasLog; inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index bfb8f015e..cb03f72ff 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -1,10 +1,77 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zenstore/CAS.h> #include <zenstore/gc.h> namespace zen { -CasGc::CasGc(CasStore& Store) : m_CasStore(Store) +////////////////////////////////////////////////////////////////////////// + +struct GcContext::GcState +{ + CasChunkSet m_CasChunks; + CasChunkSet m_CidChunks; +}; + +GcContext::GcContext() : m_State(std::make_unique<GcState>()) +{ +} + +GcContext::~GcContext() +{ +} + +void +GcContext::ContributeCids(std::span<const IoHash> Cids) +{ + m_State->m_CidChunks.AddChunksToSet(Cids); +} + +void +GcContext::ContributeCas(std::span<const IoHash> Cas) +{ + m_State->m_CasChunks.AddChunksToSet(Cas); +} + +void +GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc) +{ + m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); +} + +void +GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc) +{ + m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); }); +} + +////////////////////////////////////////////////////////////////////////// + +GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc) +{ + m_Gc.AddGcContributor(this); +} + +GcContributor::~GcContributor() +{ + m_Gc.RemoveGcContributor(this); +} + +////////////////////////////////////////////////////////////////////////// + +GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc) +{ + m_Gc.AddGcStorage(this); +} + +GcStorage::~GcStorage() +{ + m_Gc.AddGcStorage(this); +} + +////////////////////////////////////////////////////////////////////////// + +CasGc::CasGc() { } @@ -13,12 +80,52 @@ CasGc::~CasGc() } void +CasGc::AddGcContributor(GcContributor* Contributor) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcContribs.push_back(Contributor); +} + +void +CasGc::RemoveGcContributor(GcContributor* Contributor) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; }); +} + +void +CasGc::AddGcStorage(GcStorage* Storage) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcStorage.push_back(Storage); +} + +void +CasGc::RemoveGcStorage(GcStorage* Storage) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; }); +} + +void CasGc::CollectGarbage() { } void -CasGc::OnNewReferences(std::span<IoHash> Hashes) +CasGc::OnNewCidReferences(std::span<IoHash> Hashes) +{ + ZEN_UNUSED(Hashes); +} + +void +CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes) +{ + ZEN_UNUSED(Hashes); +} + +void +CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index 86e7e78d9..5b508baa0 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -11,6 +11,7 @@ #include <zencore/timer.h> #include <atomic> +#include <concepts> #include <filesystem> #include <functional> #include <memory> @@ -19,6 +20,9 @@ namespace zen { +class GcContext; +class CasGc; + struct CasStoreConfiguration { // Root directory for CAS store @@ -45,29 +49,22 @@ public: inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } + inline void FilterChunks(std::span<const IoHash> Candidates, std::invocable<const IoHash&> auto MatchFunc) + { + for (const IoHash& Candidate : Candidates) + { + if (ContainsChunk(Candidate)) + { + MatchFunc(Candidate); + } + } + } + private: // Q: should we protect this with a lock, or is that a higher level concern? std::unordered_set<IoHash> m_ChunkSet; }; -/** Garbage Collection context object - */ - -class GcContext -{ -public: - GcContext(); - ~GcContext(); - - void ContributeCids(std::span<const IoHash> Cid); - void ContributeCas(std::span<const IoHash> Hash); - -private: - struct GcState; - - std::unique_ptr<GcState> m_State; -}; - /** Context object for data scrubbing * * Data scrubbing is when we traverse stored data to validate it and @@ -116,13 +113,14 @@ public: virtual void FilterChunks(CasChunkSet& InOutChunks) = 0; virtual void Flush() = 0; virtual void Scrub(ScrubContext& Ctx) = 0; + virtual void GarbageCollect(GcContext& GcCtx) = 0; protected: CasStoreConfiguration m_Config; uint64_t m_LastScrubTime = 0; }; -ZENCORE_API CasStore* CreateCasStore(); +ZENCORE_API CasStore* CreateCasStore(CasGc& Gc); void CAS_forcelink(); diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index 00b987383..065a74b25 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -57,6 +57,8 @@ template<typename T> class TCasLogFile : public CasLogFile { public: + void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); } + // This should be called before the Replay() is called to do some basic sanity checking bool Initialize() { return true; } @@ -76,7 +78,6 @@ public: CasLogFile::Append(&Record, sizeof Record); } - void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); } }; } // namespace zen diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 5f567e7fc..acfedbc64 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -4,10 +4,13 @@ #include "zenstore.h" -#include <tsl/robin_map.h> #include <zencore/iohash.h> #include <zenstore/CAS.h> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace std::filesystem { class path; } diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 055843547..ef62158ce 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -3,26 +3,99 @@ #pragma once #include <zencore/iohash.h> +#include <zencore/thread.h> #include <span> +#define ZEN_USE_REF_TRACKING 0 // This is not currently functional + namespace zen { class CasStore; +class CasGc; struct IoHash; +/** Garbage Collection context object + */ + +class GcContext +{ +public: + GcContext(); + ~GcContext(); + + void ContributeCids(std::span<const IoHash> Cid); + void ContributeCas(std::span<const IoHash> Hash); + + void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc); + void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc); + +private: + struct GcState; + + std::unique_ptr<GcState> m_State; +}; + +/** GC root contributor + + Higher level data structures provide roots for the garbage collector, + which ultimately determine what is garbage and what data we need to + retain. + + */ + +class GcContributor +{ +public: + GcContributor(CasGc& Gc); + ~GcContributor(); + + virtual void GatherReferences(GcContext& GcCtx) = 0; + +protected: + CasGc& m_Gc; +}; + +/** GC storage provider + */ + +class GcStorage +{ +public: + GcStorage(CasGc& Gc); + ~GcStorage(); + + virtual void CollectGarbage(GcContext& GcCtrx) = 0; + +private: + CasGc& m_Gc; +}; + +/** GC orchestrator + */ + class CasGc { public: - CasGc(CasStore& Store); + CasGc(); ~CasGc(); + void AddGcContributor(GcContributor* Contributor); + void RemoveGcContributor(GcContributor* Contributor); + + void AddGcStorage(GcStorage* Contributor); + void RemoveGcStorage(GcStorage* Contributor); + void CollectGarbage(); - void OnNewReferences(std::span<IoHash> Hashes); + void OnNewCidReferences(std::span<IoHash> Hashes); + void OnCommittedCidReferences(std::span<IoHash> Hashes); + void OnDroppedCidReferences(std::span<IoHash> Hashes); private: - CasStore& m_CasStore; + RwLock m_Lock; + std::vector<GcContributor*> m_GcContribs; + std::vector<GcStorage*> m_GcStorage; }; } // namespace zen diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp index d852fa64b..9fdf2dccf 100644 --- a/zenstore/zenstore.cpp +++ b/zenstore/zenstore.cpp @@ -4,6 +4,7 @@ #include <zenstore/CAS.h> #include <zenstore/basicfile.h> +#include "compactcas.h" #include "filecas.h" namespace zen { @@ -14,6 +15,7 @@ zenstore_forcelinktests() basicfile_forcelink(); CAS_forcelink(); filecas_forcelink(); + compactcas_forcelink(); } } // namespace zen diff --git a/zenstore/zenstore.vcxproj b/zenstore/zenstore.vcxproj index eb2ecd02b..832ea8159 100644 --- a/zenstore/zenstore.vcxproj +++ b/zenstore/zenstore.vcxproj @@ -97,7 +97,6 @@ </PropertyGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <SDLCheck>true</SDLCheck> <PreprocessorDefinitions>_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> <ConformanceMode>true</ConformanceMode> @@ -111,7 +110,6 @@ </ItemDefinitionGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <FunctionLevelLinking>true</FunctionLevelLinking> <IntrinsicFunctions>true</IntrinsicFunctions> <SDLCheck>true</SDLCheck> diff --git a/zenutil/zenutil.vcxproj b/zenutil/zenutil.vcxproj index 3bf6111f7..20f803e2a 100644 --- a/zenutil/zenutil.vcxproj +++ b/zenutil/zenutil.vcxproj @@ -68,7 +68,6 @@ </PropertyGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <SDLCheck>true</SDLCheck> <PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions> <ConformanceMode>true</ConformanceMode> @@ -81,7 +80,6 @@ </ItemDefinitionGroup> <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> <ClCompile> - <WarningLevel>Level3</WarningLevel> <FunctionLevelLinking>true</FunctionLevelLinking> <IntrinsicFunctions>true</IntrinsicFunctions> <SDLCheck>true</SDLCheck> |