aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hydration.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/hydration.cpp')
-rw-r--r--src/zenserver/hub/hydration.cpp191
1 files changed, 183 insertions, 8 deletions
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index cf36d8646..673306cde 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -87,22 +87,23 @@ namespace hydration_impl {
virtual void Configure(std::string_view ModuleId,
const std::filesystem::path& TempDir,
std::string_view TargetSpecification,
- const CbObject& Options) = 0;
- virtual void SaveMetadata(const CbObject& Data) = 0;
- virtual CbObject LoadMetadata() = 0;
- virtual CbObject GetSettings() = 0;
- virtual void ParseSettings(const CbObjectView& Settings) = 0;
- virtual std::vector<IoHash> List() = 0;
+ const CbObject& Options) = 0;
+ virtual void SaveMetadata(const CbObject& Data) = 0;
+ virtual CbObject LoadMetadata() = 0;
+ virtual CbObject GetSettings() = 0;
+ virtual void ParseSettings(const CbObjectView& Settings) = 0;
+ virtual std::vector<IoHash> List() = 0;
virtual void Put(ParallelWork& Work,
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& SourcePath) = 0;
+ const std::filesystem::path& SourcePath) = 0;
virtual void Get(ParallelWork& Work,
WorkerThreadPool& WorkerPool,
const IoHash& Hash,
uint64_t Size,
- const std::filesystem::path& DestinationPath) = 0;
+ const std::filesystem::path& DestinationPath) = 0;
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) = 0;
};
constexpr std::string_view FileHydratorPrefix = "file://";
@@ -226,6 +227,13 @@ namespace hydration_impl {
});
}
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override
+ {
+ ZEN_UNUSED(Work);
+ ZEN_UNUSED(WorkerPool);
+ DeleteDirectories(m_StoragePath);
+ }
+
private:
std::filesystem::path m_StoragePath;
std::filesystem::path m_StatePathName;
@@ -513,6 +521,30 @@ namespace hydration_impl {
}
}
+ virtual void Delete(ParallelWork& Work, WorkerThreadPool& WorkerPool) override
+ {
+ std::string Prefix = m_KeyPrefix + "/";
+ S3ListObjectsResult ListResult = m_Client->ListObjects(Prefix);
+ if (!ListResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", Prefix, ListResult.Error);
+ }
+ for (const S3ObjectInfo& Obj : ListResult.Objects)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Key = Obj.Key](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ S3Result DelResult = m_Client->DeleteObject(Key);
+ if (!DelResult.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ }
+ });
+ }
+ }
+
private:
std::unique_ptr<S3Client> CreateS3Client() const
{
@@ -569,6 +601,7 @@ public:
virtual void Configure(const HydrationConfig& Config) override;
virtual void Dehydrate(const CbObject& CachedState) override;
virtual CbObject Hydrate() override;
+ virtual void Obliterate() override;
private:
struct Entry
@@ -986,6 +1019,27 @@ IncrementalHydrator::Hydrate()
}
}
+void
+IncrementalHydrator::Obliterate()
+{
+ const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
+ const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+
+ try
+ {
+ ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ m_Storage->Delete(Work, *m_Threading.WorkerPool);
+ Work.Wait();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
+ }
+
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+}
+
std::unique_ptr<HydrationStrategyBase>
CreateHydrator(const HydrationConfig& Config)
{
@@ -1299,6 +1353,55 @@ TEST_CASE("hydration.file.excluded_files_not_dehydrated")
}
// ---------------------------------------------------------------------------
+// FileHydrator obliterate test
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.file.obliterate")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "obliterate_test";
+ CreateSmallTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ Config.TargetSpecification = "file://" + HydrationStore.string();
+
+ // Dehydrate so the backend store has data
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate(CbObject());
+ }
+ CHECK(std::filesystem::exists(HydrationStore / ModuleId));
+
+ // Put some files back in ServerStateDir and TempDir to verify cleanup
+ CreateSmallTestTree(ServerStateDir);
+ WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
+
+ // Obliterate
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Obliterate();
+ }
+
+ // Backend store directory deleted
+ CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId));
+ // ServerStateDir cleaned
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+ // TempDir cleaned
+ CHECK(std::filesystem::is_empty(HydrationTemp));
+}
+
+// ---------------------------------------------------------------------------
// FileHydrator concurrent test
// ---------------------------------------------------------------------------
@@ -1551,6 +1654,78 @@ TEST_CASE("hydration.s3.concurrent")
}
}
+TEST_CASE("hydration.s3.obliterate")
+{
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19019;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ const std::string ModuleId = "s3test_obliterate";
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+
+ // Dehydrate to populate backend
+ CreateSmallTestTree(ServerStateDir);
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate(CbObject());
+ }
+
+ auto ListModuleObjects = [&]() {
+ S3ClientOptions Opts;
+ Opts.BucketName = "zen-hydration-test";
+ Opts.Endpoint = Minio.Endpoint();
+ Opts.PathStyle = true;
+ Opts.Credentials.AccessKeyId = Minio.RootUser();
+ Opts.Credentials.SecretAccessKey = Minio.RootPassword();
+ S3Client Client(Opts);
+ return Client.ListObjects(ModuleId + "/");
+ };
+
+ // Verify objects exist in S3
+ CHECK(!ListModuleObjects().Objects.empty());
+
+ // Re-populate ServerStateDir and TempDir for cleanup verification
+ CreateSmallTestTree(ServerStateDir);
+ WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
+
+ // Obliterate
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Obliterate();
+ }
+
+ // Verify S3 objects deleted
+ CHECK(ListModuleObjects().Objects.empty());
+ // Local directories cleaned
+ CHECK(std::filesystem::is_empty(ServerStateDir));
+ CHECK(std::filesystem::is_empty(HydrationTemp));
+}
+
TEST_CASE("hydration.s3.config_overrides")
{
MinioProcessOptions MinioOpts;