diff options
Diffstat (limited to 'src/zenserver/hub/hydration.cpp')
| -rw-r--r-- | src/zenserver/hub/hydration.cpp | 191 |
1 files changed, 183 insertions, 8 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index cf36d8646..673306cde 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -87,22 +87,23 @@ namespace hydration_impl { virtual void Configure(std::string_view ModuleId, const std::filesystem::path& TempDir, std::string_view TargetSpecification, - const CbObject& Options) = 0; - virtual void SaveMetadata(const CbObject& Data) = 0; - virtual CbObject LoadMetadata() = 0; - virtual CbObject GetSettings() = 0; - virtual void ParseSettings(const CbObjectView& Settings) = 0; - virtual std::vector<IoHash> List() = 0; + const CbObject& Options) = 0; + virtual void SaveMetadata(const CbObject& Data) = 0; + virtual CbObject LoadMetadata() = 0; + virtual CbObject GetSettings() = 0; + virtual void ParseSettings(const CbObjectView& Settings) = 0; + virtual std::vector<IoHash> List() = 0; virtual void Put(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, - const std::filesystem::path& SourcePath) = 0; + const std::filesystem::path& SourcePath) = 0; virtual void Get(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, uint64_t Size, - const std::filesystem::path& DestinationPath) = 0; + const std::filesystem::path& DestinationPath) = 0; + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0; }; constexpr std::string_view FileHydratorPrefix = "file://"; @@ -226,6 +227,13 @@ namespace hydration_impl { }); } + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override + { + ZEN_UNUSED(Work); + ZEN_UNUSED(WorkerPool); + DeleteDirectories(m_StoragePath); + } + private: std::filesystem::path m_StoragePath; std::filesystem::path m_StatePathName; @@ -513,6 +521,30 @@ namespace hydration_impl { } } + virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override + { + std::string Prefix = m_KeyPrefix + "/"; + S3ListObjectsResult ListResult = m_Client->ListObjects(Prefix); + if (!ListResult.IsSuccess()) + { + throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", Prefix, ListResult.Error); + } + for (const S3ObjectInfo& Obj : ListResult.Objects) + { + Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + S3Result DelResult = m_Client->DeleteObject(Key); + if (!DelResult.IsSuccess()) + { + throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); + } + }); + } + } + private: std::unique_ptr<S3Client> CreateS3Client() const { @@ -569,6 +601,7 @@ public: virtual void Configure(const HydrationConfig& Config) override; virtual void Dehydrate(const CbObject& CachedState) override; virtual CbObject Hydrate() override; + virtual void Obliterate() override; private: struct Entry @@ -986,6 +1019,27 @@ IncrementalHydrator::Hydrate() } } +void +IncrementalHydrator::Obliterate() +{ + const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); + const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + + try + { + ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + m_Storage->Delete(Work, *m_Threading.WorkerPool); + Work.Wait(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); + } + + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); +} + std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) { @@ -1299,6 +1353,55 @@ TEST_CASE("hydration.file.excluded_files_not_dehydrated") } // --------------------------------------------------------------------------- +// FileHydrator obliterate test +// --------------------------------------------------------------------------- + +TEST_CASE("hydration.file.obliterate") +{ + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "obliterate_test"; + CreateSmallTestTree(ServerStateDir); + + HydrationConfig Config; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = ModuleId; + Config.TargetSpecification = "file://" + HydrationStore.string(); + + // Dehydrate so the backend store has data + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(CbObject()); + } + CHECK(std::filesystem::exists(HydrationStore / ModuleId)); + + // Put some files back in ServerStateDir and TempDir to verify cleanup + CreateSmallTestTree(ServerStateDir); + WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); + + // Obliterate + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Obliterate(); + } + + // Backend store directory deleted + CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); + // ServerStateDir cleaned + CHECK(std::filesystem::is_empty(ServerStateDir)); + // TempDir cleaned + CHECK(std::filesystem::is_empty(HydrationTemp)); +} + +// --------------------------------------------------------------------------- // FileHydrator concurrent test // --------------------------------------------------------------------------- @@ -1551,6 +1654,78 @@ TEST_CASE("hydration.s3.concurrent") } } +TEST_CASE("hydration.s3.obliterate") +{ + MinioProcessOptions MinioOpts; + MinioOpts.Port = 19019; + MinioProcess Minio(MinioOpts); + Minio.SpawnMinioServer(); + Minio.CreateBucket("zen-hydration-test"); + + ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser()); + ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword()); + + ScopedTemporaryDirectory TempDir; + + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationTemp); + + const std::string ModuleId = "s3test_obliterate"; + + HydrationConfig Config; + Config.ServerStateDir = ServerStateDir; + Config.TempDir = HydrationTemp; + Config.ModuleId = ModuleId; + { + std::string ConfigJson = + fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})", + Minio.Endpoint()); + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError); + ZEN_ASSERT(ParseError.empty() && Root.IsObject()); + Config.Options = std::move(Root).AsObject(); + } + + // Dehydrate to populate backend + CreateSmallTestTree(ServerStateDir); + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Dehydrate(CbObject()); + } + + auto ListModuleObjects = [&]() { + S3ClientOptions Opts; + Opts.BucketName = "zen-hydration-test"; + Opts.Endpoint = Minio.Endpoint(); + Opts.PathStyle = true; + Opts.Credentials.AccessKeyId = Minio.RootUser(); + Opts.Credentials.SecretAccessKey = Minio.RootPassword(); + S3Client Client(Opts); + return Client.ListObjects(ModuleId + "/"); + }; + + // Verify objects exist in S3 + CHECK(!ListModuleObjects().Objects.empty()); + + // Re-populate ServerStateDir and TempDir for cleanup verification + CreateSmallTestTree(ServerStateDir); + WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); + + // Obliterate + { + std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config); + Hydrator->Obliterate(); + } + + // Verify S3 objects deleted + CHECK(ListModuleObjects().Objects.empty()); + // Local directories cleaned + CHECK(std::filesystem::is_empty(ServerStateDir)); + CHECK(std::filesystem::is_empty(HydrationTemp)); +} + TEST_CASE("hydration.s3.config_overrides") { MinioProcessOptions MinioOpts; |