aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-06-17 07:06:21 -0700
committerGitHub <[email protected]>2022-06-17 07:06:21 -0700
commitc7e22a4ef1cce7103b9afbeec487461cb32f8dbe (patch)
tree8b99d51bf496c96f82161c18fbdcfd5c6f8f31fd
parentfixed merge mistake which caused a build error (diff)
downloadzen-0.1.4-pre6.tar.xz
zen-0.1.4-pre6.zip
Make cas storage an hidden implementation detail of CidStore (#130)v0.1.4-pre6v0.1.4-pre5
- Bumped ZEN_SCHEMA_VERSION - CasStore no longer a public API, it is hidden behind CidStore - Moved cas.h from public header folder - CidStore no longer maps from Cid -> Cas, we store entries in Cas under RawHash - CasStore now decompresses data to validate content (matching against RawHash) - CasChunkSet renames to HashKeySet and put in separate header/cpp file - Disabled "Chunk" command for now as it relied on CAS being exposed as a service - Changed CAS http service to Cid http server - Moved "Run" command completely inside ZEN_WITH_EXEC_SERVICES define - Removed "cas.basic" test - Uncommented ".exec.basic" test and added return-skip at start of test - Moved ScrubContext to separate header file - Renamed CasGC to GcManager - Cleaned up configuration passing in cas store classes - Removed CAS stuff from GcContext and clarified naming in class - Remove migration code
-rw-r--r--xmake.lua2
-rw-r--r--zen/chunk/chunk.cpp83
-rw-r--r--zen/chunk/chunk.h2
-rw-r--r--zen/cmds/run.cpp42
-rw-r--r--zen/cmds/run.h4
-rw-r--r--zen/zen.cpp18
-rw-r--r--zenserver-test/zenserver-test.cpp176
-rw-r--r--zenserver/cache/structuredcache.cpp18
-rw-r--r--zenserver/cache/structuredcachestore.cpp728
-rw-r--r--zenserver/cache/structuredcachestore.h10
-rw-r--r--zenserver/cidstore.cpp (renamed from zenserver/casstore.cpp)46
-rw-r--r--zenserver/cidstore.h (renamed from zenserver/casstore.h)17
-rw-r--r--zenserver/compute/function.cpp33
-rw-r--r--zenserver/compute/function.h5
-rw-r--r--zenserver/projectstore.cpp10
-rw-r--r--zenserver/projectstore.h4
-rw-r--r--zenserver/testing/launch.cpp26
-rw-r--r--zenserver/testing/launch.h6
-rw-r--r--zenserver/upstream/hordecompute.cpp12
-rw-r--r--zenserver/upstream/upstreamapply.cpp9
-rw-r--r--zenserver/upstream/upstreamapply.h4
-rw-r--r--zenserver/upstream/upstreamcache.cpp1
-rw-r--r--zenserver/zenserver.cpp35
-rw-r--r--zenstore/cas.cpp90
-rw-r--r--zenstore/cas.h61
-rw-r--r--zenstore/caslog.cpp2
-rw-r--r--zenstore/cidstore.cpp261
-rw-r--r--zenstore/compactcas.cpp745
-rw-r--r--zenstore/compactcas.h16
-rw-r--r--zenstore/filecas.cpp111
-rw-r--r--zenstore/filecas.h23
-rw-r--r--zenstore/gc.cpp174
-rw-r--r--zenstore/hashkeyset.cpp60
-rw-r--r--zenstore/include/zenstore/cas.h144
-rw-r--r--zenstore/include/zenstore/caslog.h2
-rw-r--r--zenstore/include/zenstore/cidstore.h57
-rw-r--r--zenstore/include/zenstore/gc.h44
-rw-r--r--zenstore/include/zenstore/hashkeyset.h54
-rw-r--r--zenstore/include/zenstore/scrubcontext.h40
-rw-r--r--zenstore/zenstore.cpp5
40 files changed, 1033 insertions, 2147 deletions
diff --git a/xmake.lua b/xmake.lua
index 3683c5344..4b13cde5c 100644
--- a/xmake.lua
+++ b/xmake.lua
@@ -1,6 +1,6 @@
-- Copyright Epic Games, Inc. All Rights Reserved.
-set_configvar("ZEN_SCHEMA_VERSION", 3) -- changed cas oplog format (p3rl)
+set_configvar("ZEN_SCHEMA_VERSION", 4) -- store Cid data in CAS under raw hash (dan.engelbrecht)
add_requires(
"vcpkg::asio",
diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp
index 42ea8eddc..3fd7c4c1f 100644
--- a/zen/chunk/chunk.cpp
+++ b/zen/chunk/chunk.cpp
@@ -2,39 +2,39 @@
#include "chunk.h"
-#include <gsl/gsl-lite.hpp>
-
-#include <zencore/filesystem.h>
-#include <zencore/iohash.h>
-#include <zencore/logging.h>
-#include <zencore/refcount.h>
-#include <zencore/scopeguard.h>
-#include <zencore/sha1.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/thread.h>
-#include <zencore/timer.h>
-#include <zenstore/cas.h>
-#include <zenstore/gc.h>
-
-#include "../internalfile.h"
-
-#include <lz4.h>
-#include <zstd.h>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <ppl.h>
-# include <ppltasks.h>
-#endif // ZEN_PLATFORM_WINDOWS
-
-#include <cmath>
-#include <filesystem>
-#include <random>
-#include <vector>
+#if 0
+# include <gsl/gsl-lite.hpp>
+
+# include <zencore/filesystem.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/refcount.h>
+# include <zencore/scopeguard.h>
+# include <zencore/sha1.h>
+# include <zencore/string.h>
+# include <zencore/testing.h>
+# include <zencore/thread.h>
+# include <zencore/timer.h>
+# include <zenstore/gc.h>
+
+# include "../internalfile.h"
+
+# include <lz4.h>
+# include <zstd.h>
+
+# if ZEN_PLATFORM_WINDOWS
+# include <ppl.h>
+# include <ppltasks.h>
+# endif // ZEN_PLATFORM_WINDOWS
+
+# include <cmath>
+# include <filesystem>
+# include <random>
+# include <vector>
//////////////////////////////////////////////////////////////////////////
-#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+# if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
namespace Concurrency {
@@ -75,7 +75,7 @@ struct task_group
} // namespace Concurrency
-#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+# endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
//////////////////////////////////////////////////////////////////////////
@@ -595,9 +595,9 @@ protected:
{
zen::RwLock HashLock;
std::unordered_set<zen::IoHash, zen::IoHash::Hasher> Hashes;
-#if ZEN_PLATFORM_WINDOWS
-# pragma warning(suppress : 4324) // Padding due to alignment
-#endif
+# if ZEN_PLATFORM_WINDOWS
+# pragma warning(suppress : 4324) // Padding due to alignment
+# endif
};
Bucket m_Buckets[256];
@@ -902,7 +902,7 @@ public:
m_BufferManager.ReturnBuffer(Buffer);
-#if 0
+# if 0
Active.AddCount(); // needs fixing
Concurrency::create_task([this, Zfile, CurrentPosition, DataPointer, &Active] {
@@ -934,7 +934,7 @@ public:
_aligned_free(CompressBuffer);
});
-#endif
+# endif
}
StatsBlock& Stats = m_StatsBlock.local();
@@ -990,7 +990,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<zen::CasStore> CasStore;
- zen::CasGc Gc;
+ zen::GcManager Gc;
if (!m_RootDirectory.empty())
{
@@ -1056,14 +1056,14 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
zen::Stopwatch timer;
-#if 1
+# if 1
Concurrency::parallel_for_each(begin(Files), end(Files), [&Chunker](const auto& ThisFile) { Chunker.ChunkFile(ThisFile); });
-#else
+# else
for (const auto& ThisFile : Files)
{
Chunker.ChunkFile(ThisFile);
}
-#endif
+# endif
uint64_t ElapsedMs = timer.GetElapsedTimeMs();
@@ -1106,7 +1106,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
//////////////////////////////////////////////////////////////////////////
-#if ZEN_WITH_TESTS
+# if ZEN_WITH_TESTS
TEST_CASE("chunking")
{
using namespace zen;
@@ -1209,4 +1209,5 @@ TEST_CASE("chunking")
SUBCASE("mod method") { test(/* UseThreshold */ false, /* Random */ Random, 2048, 1 * 1024 * 1024); }
}
+# endif
#endif
diff --git a/zen/chunk/chunk.h b/zen/chunk/chunk.h
index f93f7e4f2..32d87b1b7 100644
--- a/zen/chunk/chunk.h
+++ b/zen/chunk/chunk.h
@@ -4,6 +4,7 @@
#include <zencore/zencore.h>
#include "../zen.h"
+#if 0
class ChunkCommand : public ZenCmdBase
{
public:
@@ -21,3 +22,4 @@ private:
size_t m_AverageChunkSize = 0;
bool m_UseCompression = true;
};
+#endif // 0
diff --git a/zen/cmds/run.cpp b/zen/cmds/run.cpp
index 0fde4648a..86e67b391 100644
--- a/zen/cmds/run.cpp
+++ b/zen/cmds/run.cpp
@@ -4,28 +4,30 @@
#include "run.h"
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iohash.h>
-#include <zencore/logging.h>
-#include <zencore/stream.h>
-#include <zencore/string.h>
-#include <zencore/timer.h>
-#include <zenutil/zenserverprocess.h>
-
-#include <filesystem>
+#if ZEN_WITH_EXEC_SERVICES
+
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/stream.h>
+# include <zencore/string.h>
+# include <zencore/timer.h>
+# include <zenutil/zenserverprocess.h>
+
+# include <filesystem>
ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
+# include <cpr/cpr.h>
ZEN_THIRD_PARTY_INCLUDES_END
-#if ZEN_PLATFORM_WINDOWS
-# pragma comment(lib, "Crypt32.lib")
-# pragma comment(lib, "Wldap32.lib")
-# pragma comment(lib, "Ws2_32.lib")
-#endif
+# if ZEN_PLATFORM_WINDOWS
+# pragma comment(lib, "Crypt32.lib")
+# pragma comment(lib, "Wldap32.lib")
+# pragma comment(lib, "Ws2_32.lib")
+# endif
//////////////////////////////////////////////////////////////////////////
@@ -156,7 +158,7 @@ RunCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
zen::IoBuffer FileData = zen::IoBufferBuilder::MakeFromFile(It->second);
cpr::Response CasResponse =
- cpr::Post(cpr::Url("http://localhost:13337/cas"), cpr::Body((const char*)FileData.Data(), FileData.Size()));
+ cpr::Post(cpr::Url("http://localhost:13337/cid"), cpr::Body((const char*)FileData.Data(), FileData.Size()));
if (CasResponse.status_code >= 300)
{
@@ -179,3 +181,5 @@ RunCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
} // namespace zen
+
+#endif // ZEN_WITH_EXEC_SERVICES
diff --git a/zen/cmds/run.h b/zen/cmds/run.h
index 3e1e3f2b2..c5595f235 100644
--- a/zen/cmds/run.h
+++ b/zen/cmds/run.h
@@ -4,6 +4,8 @@
#include "../zen.h"
+#if ZEN_WITH_EXEC_SERVICES
+
namespace zen {
/** Execute a command (using Zen)
@@ -24,3 +26,5 @@ private:
};
} // namespace zen
+
+#endif // ZEN_WITH_EXEC_SERVICES
diff --git a/zen/zen.cpp b/zen/zen.cpp
index 302e8496f..5abe4b04e 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -20,7 +20,6 @@
#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/zencore.h>
-#include <zenstore/cas.h>
#if ZEN_WITH_TESTS
# define ZEN_TEST_WITH_RUNNER
@@ -109,12 +108,13 @@ main(int argc, char** argv)
auto _ = zen::MakeGuard([] { spdlog::shutdown(); });
- HashCommand HashCmd;
- CopyCommand CopyCmd;
- DedupCommand DedupCmd;
- DropCommand DropCmd;
- ChunkCommand ChunkCmd;
- RunCommand RunCmd;
+ HashCommand HashCmd;
+ CopyCommand CopyCmd;
+ DedupCommand DedupCmd;
+ DropCommand DropCmd;
+#if ZEN_WITH_EXEC_SERVICES
+ RunCommand RunCmd;
+#endif
StatusCommand StatusCmd;
TopCommand TopCmd;
PrintCommand PrintCmd;
@@ -134,14 +134,16 @@ main(int argc, char** argv)
const char* CmdSummary;
} Commands[] = {
// clang-format off
- {"chunk", &ChunkCmd, "Perform chunking"},
+// {"chunk", &ChunkCmd, "Perform chunking"},
{"copy", &CopyCmd, "Copy file(s)"},
{"dedup", &DedupCmd, "Dedup files"},
{"drop", &DropCmd, "Drop cache bucket(s)"},
{"hash", &HashCmd, "Compute file hashes"},
{"print", &PrintCmd, "Print compact binary object"},
{"printpackage", &PrintPkgCmd, "Print compact binary package"},
+#if ZEN_WITH_EXEC_SERVICES
{"run", &RunCmd, "Remote execution"},
+#endif // ZEN_WITH_EXEC_SERVICES
{"status", &StatusCmd, "Show zen status"},
{"ps", &PsCmd, "Enumerate running zen server instances"},
{"top", &TopCmd, "Monitor zen server activity"},
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 46b566a47..0ffb77f3e 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -438,106 +438,6 @@ TEST_CASE("multi.basic")
ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
}
-TEST_CASE("cas.basic")
-{
- std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-
- const uint16_t PortNumber = 13337;
-
- const int IterationCount = 1000;
-
- std::vector<int> ChunkSizes(IterationCount);
- std::vector<zen::IoHash> ChunkHashes(IterationCount);
-
- {
- ZenServerInstance Instance1(TestEnv);
- Instance1.SetTestDir(TestDir);
- Instance1.SpawnServer(PortNumber);
- Instance1.WaitUntilReady();
-
- std::atomic<uint64_t> RequestCount{0};
-
- zen::Stopwatch timer;
-
- std::mt19937_64 mt;
-
- auto BaseUri = fmt::format("http://localhost:{}/cas", PortNumber);
-
- cpr::Session cli;
- cli.SetUrl(cpr::Url{BaseUri});
-
- // Populate CAS with some generated data
-
- for (int i = 0; i < IterationCount; ++i)
- {
- const int ChunkSize = mt() % 10000 + 5;
- std::string body = fmt::format("{}", i);
- body.resize(ChunkSize, ' ');
-
- ChunkSizes[i] = ChunkSize;
- ChunkHashes[i] = zen::IoHash::HashBuffer(body.data(), body.size());
-
- cli.SetBody(body);
-
- auto res = cli.Post();
- CHECK(!res.error);
-
- ++RequestCount;
- }
-
- // Verify that the chunks persisted
-
- for (int i = 0; i < IterationCount; ++i)
- {
- zen::ExtendableStringBuilder<128> Uri;
- Uri << BaseUri << "/";
- ChunkHashes[i].ToHexString(Uri);
-
- auto res = cpr::Get(cpr::Url{Uri.c_str()});
- CHECK(!res.error);
- CHECK(res.status_code == 200);
- CHECK(res.text.size() == ChunkSizes[i]);
-
- zen::IoHash Hash = zen::IoHash::HashBuffer(res.text.data(), res.text.size());
-
- CHECK(ChunkHashes[i] == Hash);
-
- ++RequestCount;
- }
-
- uint64_t Elapsed = timer.GetElapsedTimeMs();
-
- ZEN_INFO("{} requests in {} ({})",
- RequestCount,
- zen::NiceTimeSpanMs(Elapsed),
- zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
- }
-
- // Verify that the data persists between process runs (the previous server has exited at this point)
-
- {
- ZenServerInstance Instance2(TestEnv);
- Instance2.SetTestDir(TestDir);
- Instance2.SpawnServer(PortNumber);
- Instance2.WaitUntilReady();
-
- for (int i = 0; i < IterationCount; ++i)
- {
- zen::ExtendableStringBuilder<128> Uri;
- Uri << fmt::format("http://localhost:{}/cas/", PortNumber);
- ChunkHashes[i].ToHexString(Uri);
-
- auto res = cpr::Get(cpr::Url{Uri.c_str()});
- CHECK(res.status_code == 200);
- CHECK(res.text.size() == ChunkSizes[i]);
-
- zen::IoHash Hash = zen::IoHash::HashBuffer(res.text.data(), res.text.size());
-
- CHECK(ChunkHashes[i] == Hash);
- }
- }
-}
-
TEST_CASE("project.basic")
{
using namespace std::literals;
@@ -2416,11 +2316,11 @@ struct RemoteExecutionRequest
{
zen::IoBuffer FileData = zen::IoBufferBuilder::MakeFromFile(It->second);
- cpr::Response CasResponse = cpr::Post(cpr::Url(m_CasUri), cpr::Body((const char*)FileData.Data(), FileData.Size()));
+ cpr::Response CidResponse = cpr::Post(cpr::Url(m_CidUri), cpr::Body((const char*)FileData.Data(), FileData.Size()));
- if (CasResponse.status_code >= 300)
+ if (CidResponse.status_code >= 300)
{
- ZEN_ERROR("CAS put failed with {}", CasResponse.status_code);
+ ZEN_ERROR("CID put failed with {}", CidResponse.status_code);
}
}
else
@@ -2496,45 +2396,47 @@ private:
int m_PortNumber;
std::filesystem::path m_TreePath;
const std::string m_BaseUri = fmt::format("http://{}:{}/exec/jobs", m_HostName, m_PortNumber);
- const std::string m_CasUri = fmt::format("http://{}:{}/cas", m_HostName, m_PortNumber);
+ const std::string m_CidUri = fmt::format("http://{}:{}/cid", m_HostName, m_PortNumber);
Visitor m_Visit{m_TreePath};
zen::BinaryWriter m_MemOut;
};
-// TEST_CASE(".exec.basic" /* * doctest::skip(true) */)
-//{
-// using namespace std::literals;
-//
-// std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
-//
-// const uint16_t PortNumber = 13337;
-//
-// ZenServerInstance Zen1(TestEnv);
-// Zen1.SetTestDir(TestDir);
-// Zen1.SpawnServer(PortNumber);
-// Zen1.WaitUntilReady();
-//
-// std::filesystem::path TreePath = TestEnv.GetTestRootDir("test/remote1");
-//
-// {
-// RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath);
-// RemoteRequest.Build("zentest-appstub.exe", "");
-// RemoteRequest.Prep();
-// zen::CbObject Result = RemoteRequest.Exec();
-//
-// CHECK(Result["exitcode"sv].AsInt32(-1) == 0);
-// }
-//
-// {
-// RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath);
-// RemoteRequest.Build("zentest-appstub.exe", "-f=1");
-// RemoteRequest.Prep();
-// zen::CbObject Result = RemoteRequest.Exec();
-//
-// CHECK(Result["exitcode"sv].AsInt32(-1) == 1);
-// }
-// }
+TEST_CASE(".exec.basic")
+{
+ if (true)
+ {
+ return;
+ }
+ using namespace std::literals;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+
+ const uint16_t PortNumber = 13337;
+
+ ZenServerInstance Zen1(TestEnv);
+ Zen1.SetTestDir(TestDir);
+ Zen1.SpawnServer(PortNumber);
+ Zen1.WaitUntilReady();
+
+ std::filesystem::path TreePath = TestEnv.GetTestRootDir("test/remote1");
+
+ {
+ RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath);
+ RemoteRequest.Build("zentest-appstub.exe", "");
+ RemoteRequest.Prep();
+ zen::CbObject Result = RemoteRequest.Exec();
+ CHECK(Result["exitcode"sv].AsInt32(-1) == 0);
+ }
+
+ {
+ RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath);
+ RemoteRequest.Build("zentest-appstub.exe", "-f=1");
+ RemoteRequest.Prep();
+ zen::CbObject Result = RemoteRequest.Exec();
+ CHECK(Result["exitcode"sv].AsInt32(-1) == 1);
+ }
+}
# endif // ZEN_WITH_EXEC_SERVICES
TEST_CASE("mesh.basic")
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 45bbe062b..1f89e7362 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -16,7 +16,6 @@
#include <zencore/trace.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
-#include <zenstore/cas.h>
#include <zenutil/cache/cache.h>
//#include "cachekey.h"
@@ -26,6 +25,7 @@
#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
+#include "zenstore/scrubcontext.h"
#include <algorithm>
#include <atomic>
@@ -859,7 +859,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
- ValidAttachments.emplace_back(InsertResult.DecompressedId);
+ ValidAttachments.emplace_back(Hash);
if (InsertResult.New)
{
@@ -1205,7 +1205,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
- ValidAttachments.emplace_back(InsertResult.DecompressedId);
+ ValidAttachments.emplace_back(ValueHash);
if (InsertResult.New)
{
@@ -2382,7 +2382,7 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
const uint64_t MissCount = m_CacheStats.MissCount;
const uint64_t TotalCount = HitCount + MissCount;
- const CasStoreSize CasSize = m_CidStore.CasSize();
+ const CidStoreSize CidSize = m_CidStore.TotalSize();
const GcStorageSize CacheSize = m_CacheStore.StorageSize();
Cbo.BeginObject("cache");
@@ -2400,12 +2400,12 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
m_UpstreamCache.GetStatus(Cbo);
Cbo.EndObject();
- Cbo.BeginObject("cas");
+ Cbo.BeginObject("cid");
Cbo.BeginObject("size");
- Cbo << "tiny" << CasSize.TinySize;
- Cbo << "small" << CasSize.SmallSize;
- Cbo << "large" << CasSize.LargeSize;
- Cbo << "total" << CasSize.TotalSize;
+ Cbo << "tiny" << CidSize.TinySize;
+ Cbo << "small" << CidSize.SmallSize;
+ Cbo << "large" << CidSize.LargeSize;
+ Cbo << "total" << CidSize.TotalSize;
Cbo.EndObject();
Cbo.EndObject();
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 4be33170c..4e7ad522d 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -18,6 +18,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
#include <xxhash.h>
@@ -66,67 +67,10 @@ namespace {
static_assert(sizeof(CacheBucketIndexHeader) == 32);
- struct LegacyDiskLocation
- {
- inline LegacyDiskLocation() = default;
-
- inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
- : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
- , LowerSize(ValueSize & 0xFFFFffff)
- , IndexDataSize(IndexSize)
- {
- }
-
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
- static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
-
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
-
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t Size() const { return LowerSize; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
-
- if (IsFlagSet(LegacyDiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
-
- if (IsFlagSet(LegacyDiskLocation::kCompressed))
- {
- ContentType = ZenContentType::kCompressedBinary;
- }
-
- return ContentType;
- }
- inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; }
-
- private:
- uint64_t OffsetAndFlags = 0;
- uint32_t LowerSize = 0;
- uint32_t IndexDataSize = 0;
- };
-
- struct LegacyDiskIndexEntry
- {
- IoHash Key;
- LegacyDiskLocation Location;
- };
-
#pragma pack(pop)
- static_assert(sizeof(LegacyDiskIndexEntry) == 36);
-
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".slog";
- const char* LegacyDataExtension = ".sobs";
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
@@ -143,42 +87,6 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
- std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir)
- {
- return BucketDir / (std::string("zen") + LogExtension);
- }
-
- std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir)
- {
- return BucketDir / (std::string("zen") + LegacyDataExtension);
- }
-
- bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured |
- LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed))
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString());
- return false;
- }
- if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
- {
- return true;
- }
- uint64_t Size = Entry.Location.Size();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -262,7 +170,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
-ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir)
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir)
: GcStorage(Gc)
, GcContributor(Gc)
, m_RootDir(RootDir)
@@ -583,9 +491,25 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
+ auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) {
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+ return Error == CbValidateError::None;
+ }
+ if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed)
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
for (auto& Kv : m_CacheMap)
{
- if (Kv.first != IoHash::HashBuffer(Kv.second.Payload))
+ if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -593,7 +517,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- Ctx.ReportBadCasChunks(BadHashes);
+ Ctx.ReportBadCidChunks(BadHashes);
}
}
@@ -891,229 +815,6 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
return 0;
};
-uint64_t
-ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
-{
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
-
- if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
- {
- return 0;
- }
-
- ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName);
-
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
-
- uint64_t MigratedChunkCount = 0;
- uint32_t MigratedBlockCount = 0;
- Stopwatch MigrationTimer;
- uint64_t TotalSize = 0;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
- m_BucketDir / m_BucketName,
- MigratedChunkCount,
- MigratedBlockCount,
- NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
- NiceBytes(TotalSize));
- });
-
- uint64_t BlockFileSize = 0;
- {
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- BlockFileSize = BlockFile.FileSize();
- }
-
- std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
- uint64_t InvalidEntryCount = 0;
-
- size_t BlockChunkCount = 0;
- TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
- LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
- {
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
- LegacyLogPath,
- LegacyDiskIndex.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- if (LegacyCasLog.Initialize())
- {
- LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay(
- [&](const LegacyDiskIndexEntry& Record) {
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- std::string InvalidEntryReason;
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- if (m_Index.contains(Record.Key))
- {
- return;
- }
- LegacyDiskIndex[Record.Key] = Record;
- },
- 0);
-
- std::vector<IoHash> BadEntries;
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyDiskIndexEntry& Record(Entry.second);
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- continue;
- }
- if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
- {
- BlockChunkCount++;
- continue;
- }
- ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
- BadEntries.push_back(Entry.first);
- }
- for (const IoHash& BadHash : BadEntries)
- {
- LegacyDiskIndex.erase(BadHash);
- }
- InvalidEntryCount += BadEntries.size();
- }
- }
- if (InvalidEntryCount)
- {
- ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
- }
-
- if (LegacyDiskIndex.empty())
- {
- LegacyCasLog.Close();
- if (CleanSource)
- {
- // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
- // a manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- }
- return 0;
- }
-
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- CreateDirectories(LogPath.parent_path());
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
- std::vector<BlockStoreLocation> ChunkLocations;
- ChunkIndexToChunkHash.reserve(BlockChunkCount);
- ChunkLocations.reserve(BlockChunkCount);
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount);
-
- for (const auto& Entry : LegacyDiskIndex)
- {
- const IoHash& ChunkHash = Entry.first;
- const LegacyDiskLocation& Location = Entry.second.Location;
- if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- uint8_t Flags = 0xff & (Location.Flags() >> 56);
- DiskLocation NewLocation = DiskLocation(Location.Size(), Flags);
- LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
- continue;
- }
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()});
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- TotalSize += Location.Size();
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- }
- CasLog.Append(LogEntries);
-
- m_BlockStore.Split(
- ChunkLocations,
- LegacyDataPath,
- m_BlocksBasePath,
- MaxBlockSize,
- BlockStoreDiskLocation::MaxBlockIndex + 1,
- m_PayloadAlignment,
- CleanSource,
- [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
- const BlockStore::MovedChunksArray& MovedChunks) {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- const LegacyDiskLocation& OldLocation = OldEntry.Location;
- uint8_t Flags = 0xff & (OldLocation.Flags() >> 56);
- LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)});
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- }
- CasLog.Append(LogEntries);
- CasLog.Flush();
- if (CleanSource)
- {
- std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- const LegacyDiskLocation& OldLocation = OldEntry.Location;
- LegacyDiskLocation NewLocation(OldLocation.Offset(),
- OldLocation.Size(),
- 0,
- OldLocation.Flags() | LegacyDiskLocation::kTombStone);
- LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation});
- }
- LegacyCasLog.Append(LegacyLogEntries);
- LegacyCasLog.Flush();
- }
- MigratedBlockCount++;
- MigratedChunkCount += MovedChunks.size();
- });
-
- LegacyCasLog.Close();
- CasLog.Close();
-
- if (CleanSource)
- {
- // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
- // a manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- }
- return MigratedChunkCount;
-}
-
void
ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
{
@@ -1123,23 +824,18 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
m_Index.clear();
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
if (IsNew)
{
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
- fs::remove(LegacyLogPath);
- fs::remove(LegacyDataPath);
fs::remove(LogPath);
fs::remove(IndexPath);
fs::remove_all(m_BlocksBasePath);
}
- uint64_t LogPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(LogPosition);
- uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
CreateDirectories(m_BucketDir);
@@ -1161,7 +857,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
- if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ if (IsNew || LogEntryCount > 0)
{
MakeIndexSnapshot();
}
@@ -1309,6 +1005,7 @@ void
ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
std::vector<IoHash> BadKeys;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
@@ -1341,6 +1038,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
+ ++ChunkCount;
+ ChunkBytes += Loc.Size();
if (Loc.GetContentType() == ZenContentType::kBinary)
{
ExtendablePathBuilder<256> DataFilePath;
@@ -1381,6 +1080,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
if (!Data)
{
@@ -1403,8 +1104,11 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
if (!Buffer)
{
BadKeys.push_back(Hash);
@@ -1422,40 +1126,41 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
_.ReleaseNow();
- if (BadKeys.empty())
- {
- return;
- }
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
-
- if (Ctx.RunRecovery())
+ if (!BadKeys.empty())
{
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
+ ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
+ if (Ctx.RunRecovery())
{
- RwLock::ExclusiveLockScope __(m_IndexLock);
- for (const IoHash& BadKey : BadKeys)
- {
- // Log a tombstone and delete the in-memory index for the bad entry
+ // Deal with bad chunks by removing them from our lookup map
- const auto It = m_Index.find(BadKey);
- DiskLocation Location = It->second.Location;
- Location.Flags |= DiskLocation::kTombStone;
- LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
- m_Index.erase(BadKey);
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
+
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ for (const IoHash& BadKey : BadKeys)
+ {
+ // Log a tombstone and delete the in-memory index for the bad entry
+ const auto It = m_Index.find(BadKey);
+ DiskLocation Location = It->second.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
+ m_Index.erase(BadKey);
+ }
}
+ m_SlogFile.Append(LogEntries);
}
- m_SlogFile.Append(LogEntries);
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCasChunks(BadKeys);
+ Ctx.ReportBadCidChunks(BadKeys);
+
+ ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -1517,7 +1222,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
if (Cids.size() > 1024)
{
- GcCtx.ContributeCids(Cids);
+ GcCtx.AddRetainedCids(Cids);
Cids.clear();
}
@@ -1552,8 +1257,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
}
- GcCtx.ContributeCids(Cids);
- GcCtx.ContributeCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
+ GcCtx.AddRetainedCids(Cids);
+ GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
}
void
@@ -1601,7 +1306,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
std::vector<IoHash> DeleteCacheKeys;
DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
- GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
if (Keep)
{
return;
@@ -1752,7 +1457,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
ChunkLocations.reserve(TotalChunkCount);
ChunkIndexToChunkHash.reserve(TotalChunkCount);
- GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
auto KeyIt = Index.find(ChunkHash);
const DiskLocation& DiskLocation = KeyIt->second.Location;
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
@@ -1836,7 +1541,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
},
[&]() { return GcCtx.CollectSmallObjects(); });
- GcCtx.DeletedCas(DeletedChunks);
+ GcCtx.AddDeletedCids(DeletedChunks);
}
void
@@ -2302,7 +2007,7 @@ ZenCacheDiskLayer::TotalSize() const
static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
-ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
+ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
: GcStorage(Gc)
, GcContributor(Gc)
, m_Gc(Gc)
@@ -2313,7 +2018,6 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
DirectoryContent DirContent;
GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
- std::vector<std::string> LegacyBuckets;
std::vector<std::string> Namespaces;
for (const std::filesystem::path& DirPath : DirContent.Directories)
{
@@ -2323,33 +2027,17 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
continue;
}
- LegacyBuckets.push_back(DirName);
}
- ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), m_Configuration.BasePath, LegacyBuckets.size());
+ ZEN_INFO("Found #{} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath);
if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
{
// default (unspecified) and ue4-ddc namespace points to the same namespace instance
- ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName);
-
std::filesystem::path DefaultNamespaceFolder =
m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
CreateDirectories(DefaultNamespaceFolder);
-
- // Move any non-namespace folders into the default namespace folder
- for (const std::string& DirName : LegacyBuckets)
- {
- std::filesystem::path LegacyFolder = m_Configuration.BasePath / DirName;
- std::filesystem::path NewPath = DefaultNamespaceFolder / DirName;
- std::error_code Ec;
- std::filesystem::rename(LegacyFolder, NewPath, Ec);
- if (Ec)
- {
- ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message());
- }
- }
Namespaces.push_back(std::string(UE4DDCNamespaceName));
}
@@ -2537,7 +2225,7 @@ TEST_CASE("z$.store")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
@@ -2592,7 +2280,7 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
@@ -2612,7 +2300,7 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
@@ -2635,7 +2323,7 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
@@ -2655,7 +2343,7 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
@@ -2680,7 +2368,7 @@ TEST_CASE("z$.gc")
ScopedTemporaryDirectory TempDir;
std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
- const auto CollectAndFilter = [](CasGc& Gc,
+ const auto CollectAndFilter = [](GcManager& Gc,
GcClock::TimePoint Time,
GcClock::Duration MaxDuration,
std::span<const IoHash> Cids,
@@ -2693,7 +2381,7 @@ TEST_CASE("z$.gc")
};
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "teardrinker"sv;
@@ -2730,7 +2418,7 @@ TEST_CASE("z$.gc")
// Expect timestamps to be serialized
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
std::vector<IoHash> Keep;
@@ -2751,7 +2439,7 @@ TEST_CASE("z$.gc")
SUBCASE("gc removes standalone values")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -2799,7 +2487,7 @@ TEST_CASE("z$.gc")
SUBCASE("gc removes small objects")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "rightintwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -2848,154 +2536,6 @@ TEST_CASE("z$.gc")
}
}
-TEST_CASE("z$.legacyconversion")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[] = {2041,
- 1123,
- 1223,
- 1239,
- 341,
- 1412,
- 912,
- 774,
- 341,
- 431,
- 554,
- 1098,
- 2048,
- 339 + 64 * 1024,
- 561 + 64 * 1024,
- 16 + 64 * 1024,
- 16 + 64 * 1024,
- 2048,
- 2048};
- size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
- size_t SingleBlockSize = 0;
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(ChunkCount);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(testutils::CreateBinaryCacheValue(Size));
- SingleBlockSize += Size;
- }
-
- ZEN_UNUSED(SingleBlockSize);
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(ChunkCount);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CreateDirectories(TempDir.Path());
-
- const std::string Bucket = "rightintwo";
- {
- CasGc Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
- const GcClock::TimePoint CurrentTime = GcClock::Now();
-
- for (size_t i = 0; i < ChunkCount; i++)
- {
- Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]});
- }
-
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcContext GcCtx(CurrentTime + std::chrono::hours(2));
- GcCtx.MaxCacheDuration(std::chrono::minutes(2));
- GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepChunks);
- Zcs.Flush();
- Gc.CollectGarbage(GcCtx);
- }
- std::filesystem::path BucketDir = TempDir.Path() / Bucket;
- std::filesystem::path BlocksBaseDir = BucketDir / "blocks";
-
- std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1);
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir);
- std::filesystem::remove(LegacyDataPath);
- std::filesystem::rename(CasPath, LegacyDataPath);
-
- std::vector<DiskIndexEntry> LogEntries;
- std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket);
- if (std::filesystem::is_regular_file(IndexPath))
- {
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion &&
- Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
- {
- LogEntries.resize(Header.EntryCount);
- ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- }
- }
- ObjectIndexFile.Close();
- std::filesystem::remove(IndexPath);
- }
-
- std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket);
- {
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- LogEntries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
- }
- TCasLogFile<LegacyDiskIndexEntry> LegacyLog;
- std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir);
- LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
-
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- uint64_t Size;
- uint64_t Offset;
- if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- Size = Entry.Location.Location.StandaloneSize;
- Offset = 0;
- }
- else
- {
- BlockStoreLocation Location = Entry.Location.GetBlockLocation(16);
- Size = Location.Size;
- Offset = Location.Offset;
- }
- LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56);
- LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation};
- LegacyLog.Append(LegacyEntry);
- }
- LegacyLog.Close();
-
- std::filesystem::remove_all(BlocksBaseDir);
- std::filesystem::remove(LogPath);
- std::filesystem::remove(IndexPath);
-
- {
- CasGc Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
-
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- ZenCacheValue Value;
- CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value));
- CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value));
- }
- }
-}
-
TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
{
// for (uint32_t i = 0; i < 100; ++i)
@@ -3045,7 +2585,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
CreateDirectories(TempDir.Path());
WorkerThreadPool ThreadPool(4);
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path());
{
@@ -3169,10 +2709,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Zcs.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
while (WorkCompleted < NewChunks.size() + Chunks.size())
@@ -3217,10 +2757,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Zcs.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
}
{
@@ -3261,7 +2801,7 @@ TEST_CASE("z$.namespaces")
IoHash Key1;
IoHash Key2;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false});
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -3286,7 +2826,7 @@ TEST_CASE("z$.namespaces")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -3346,7 +2886,7 @@ TEST_CASE("z$.drop.bucket")
};
WorkerThreadPool Workers(1);
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket = "teardrinker"sv;
const auto Namespace = "mynamespace"sv;
@@ -3415,7 +2955,7 @@ TEST_CASE("z$.drop.namespace")
};
WorkerThreadPool Workers(1);
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket1 = "teardrinker1"sv;
const auto Bucket2 = "teardrinker2"sv;
@@ -3480,7 +3020,7 @@ TEST_CASE("z$.blocked.disklayer.put")
return Writer.Save();
};
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
@@ -3517,6 +3057,96 @@ TEST_CASE("z$.blocked.disklayer.put")
CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
}
+TEST_CASE("z$.scrub")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ using namespace testutils;
+
+ struct CacheRecord
+ {
+ IoBuffer Record;
+ std::vector<CompressedBuffer> Attachments;
+ };
+
+ auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
+ CacheRecord Result;
+ if (Structured)
+ {
+ Result.Attachments.resize(AttachmentSizes.size());
+ CbObjectWriter Record;
+ Record.BeginObject("Key"sv);
+ {
+ Record << "Bucket"sv << Bucket;
+ Record << "Hash"sv << Key;
+ }
+ Record.EndObject();
+ for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]);
+ CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash()));
+ Result.Attachments[Index] = CompressedAttachmentData;
+ }
+ Result.Record = Record.Save().GetBuffer().AsIoBuffer();
+ Result.Record.SetContentType(ZenContentType::kCbObject);
+ }
+ else
+ {
+ std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
+ size_t TotalSize = RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ TotalSize += AttachmentSize;
+ }
+ Result.Record = IoBuffer(TotalSize);
+ char* DataPtr = (char*)Result.Record.MutableData();
+ memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
+ DataPtr += RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize);
+ memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
+ DataPtr += AttachmentData.GetSize();
+ }
+ }
+ return Result;
+ };
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ auto CreateRecords =
+ [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
+ for (const IoHash& Cid : Cids)
+ {
+ CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record});
+ for (const CompressedBuffer& Attachment : Record.Attachments)
+ {
+ CidStore.AddChunk(Attachment);
+ }
+ }
+ };
+
+ std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
+
+ std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
+ CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
+
+ std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
+ CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
+
+ ScrubContext ScrubCtx;
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
+ CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
+ CHECK(ScrubCtx.BadCids().GetSize() == 0);
+}
+
#endif
void
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index ea33a3c00..b81e44835 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -9,7 +9,6 @@
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
#include <zenstore/blockstore.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
@@ -27,8 +26,9 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
class PathBuilderBase;
-class CasGc;
+class GcManager;
class ZenCacheTracker;
+class ScrubContext;
/******************************************************************************
@@ -327,7 +327,7 @@ private:
class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor
{
public:
- ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir);
+ ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir);
~ZenCacheNamespace();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -369,7 +369,7 @@ public:
bool AllowAutomaticCreationOfNamespaces = true;
};
- ZenCacheStore(CasGc& Gc, const Configuration& Configuration);
+ ZenCacheStore(GcManager& Gc, const Configuration& Configuration);
~ZenCacheStore();
bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -393,7 +393,7 @@ private:
NamespaceMap m_Namespaces;
std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
- CasGc& m_Gc;
+ GcManager& m_Gc;
Configuration m_Configuration;
};
diff --git a/zenserver/casstore.cpp b/zenserver/cidstore.cpp
index 872a40df8..5de347a17 100644
--- a/zenserver/casstore.cpp
+++ b/zenserver/cidstore.cpp
@@ -1,23 +1,25 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "casstore.h"
+#include "cidstore.h"
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zenstore/cidstore.h>
#include <gsl/gsl-lite.hpp>
namespace zen {
-HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
+HttpCidService::HttpCidService(CidStore& Store) : m_CidStore(Store)
{
- m_Router.AddPattern("cas", "([0-9A-Fa-f]{40})");
+ m_Router.AddPattern("cid", "([0-9A-Fa-f]{40})");
m_Router.RegisterRoute(
- "{cas}",
+ "{cid}",
[this](HttpRouterRequest& Req) {
IoHash Hash = IoHash::FromHexString(Req.GetCapture(1));
- ZEN_DEBUG("CAS request for {}", Hash);
+ ZEN_DEBUG("CID request for {}", Hash);
HttpServerRequest& ServerRequest = Req.ServerRequest();
@@ -26,7 +28,7 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
case HttpVerb::kGet:
case HttpVerb::kHead:
{
- if (IoBuffer Value = m_CasStore.FindChunk(Hash))
+ if (IoBuffer Value = m_CidStore.FindChunkByCid(Hash))
{
return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
}
@@ -37,8 +39,14 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
case HttpVerb::kPut:
{
- IoBuffer Payload = ServerRequest.ReadPayload();
- IoHash PayloadHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+ IoBuffer Payload = ServerRequest.ReadPayload();
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (!Compressed)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::UnsupportedMediaType);
+ }
+
+ IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
// URI hash must match content hash
if (PayloadHash != Hash)
@@ -46,7 +54,7 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest);
}
- m_CasStore.InsertChunk(Payload, PayloadHash);
+ m_CidStore.AddChunk(Compressed);
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -60,13 +68,13 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
}
const char*
-HttpCasService::BaseUri() const
+HttpCidService::BaseUri() const
{
- return "/cas/";
+ return "/cid/";
}
void
-HttpCasService::HandleRequest(zen::HttpServerRequest& Request)
+HttpCidService::HandleRequest(zen::HttpServerRequest& Request)
{
if (Request.RelativeUri().empty())
{
@@ -77,12 +85,18 @@ HttpCasService::HandleRequest(zen::HttpServerRequest& Request)
case HttpVerb::kPut:
case HttpVerb::kPost:
{
- IoBuffer Payload = Request.ReadPayload();
- IoHash PayloadHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+ IoBuffer Payload = Request.ReadPayload();
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (!Compressed)
+ {
+ return Request.WriteResponse(HttpResponseCode::UnsupportedMediaType);
+ }
+
+ IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
- ZEN_DEBUG("CAS POST request for {} ({} bytes)", PayloadHash, Payload.Size());
+ ZEN_DEBUG("CID POST request for {} ({} bytes)", PayloadHash, Payload.Size());
- auto InsertResult = m_CasStore.InsertChunk(Payload, PayloadHash);
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
if (InsertResult.New)
{
diff --git a/zenserver/casstore.h b/zenserver/cidstore.h
index 4ca6908b5..8e7832b35 100644
--- a/zenserver/casstore.h
+++ b/zenserver/cidstore.h
@@ -3,31 +3,32 @@
#pragma once
#include <zenhttp/httpserver.h>
-#include <zenstore/cas.h>
namespace zen {
/**
- * Simple CAS store HTTP endpoint
+ * Simple CID store HTTP endpoint
*
* Note that since this does not end up pinning any of the chunks it's only really useful for a small subset of use cases where you know a
- * chunk exists in the underlying CAS store. Thus it's mainly useful for internal use when communicating between Zen store instances
+ * chunk exists in the underlying CID store. Thus it's mainly useful for internal use when communicating between Zen store instances
*
- * Using this interface for adding CAS chunks makes little sense except for testing purposes as garbage collection may reap anything you add
+ * Using this interface for adding CID chunks makes little sense except for testing purposes as garbage collection may reap anything you add
* before anything ever gets to access it
*/
-class HttpCasService : public HttpService
+class CidStore;
+
+class HttpCidService : public HttpService
{
public:
- explicit HttpCasService(CasStore& Store);
- ~HttpCasService() = default;
+ explicit HttpCidService(CidStore& Store);
+ ~HttpCidService() = default;
virtual const char* BaseUri() const override;
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
private:
- CasStore& m_CasStore;
+ CidStore& m_CidStore;
HttpRequestRouter m_Router;
};
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp
index 171c67a6e..d7316ac64 100644
--- a/zenserver/compute/function.cpp
+++ b/zenserver/compute/function.cpp
@@ -17,7 +17,6 @@
# include <zencore/iobuffer.h>
# include <zencore/iohash.h>
# include <zencore/scopeguard.h>
-# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
# include <span>
@@ -26,25 +25,22 @@ using namespace std::literals;
namespace zen {
-HttpFunctionService::HttpFunctionService(CasStore& Store,
- CidStore& InCidStore,
+HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
const CloudCacheClientOptions& ComputeOptions,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& ComputeAuthConfig,
const UpstreamAuthConfig& StorageAuthConfig,
AuthMgr& Mgr)
: m_Log(logging::Get("apply"))
-, m_CasStore(Store)
, m_CidStore(InCidStore)
{
- m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore);
+ m_UpstreamApply = UpstreamApply::Create({}, m_CidStore);
InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] {
auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
ComputeAuthConfig,
StorageOptions,
StorageAuthConfig,
- m_CasStore,
m_CidStore,
Mgr);
m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
@@ -99,18 +95,18 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
// Determine which pieces are missing and need to be transmitted to populate CAS
- CasChunkSet ChunkSet;
+ HashKeySet ChunkSet;
FunctionSpec.IterateAttachments([&](CbFieldView Field) {
const IoHash Hash = Field.AsHash();
- ChunkSet.AddChunkToSet(Hash);
+ ChunkSet.AddHashToSet(Hash);
});
// Note that we store executables uncompressed to make it
// more straightforward and efficient to materialize them, hence
// the CAS lookup here instead of CID for the input payloads
- m_CasStore.FilterChunks(ChunkSet);
+ m_CidStore.FilterChunks(ChunkSet);
if (ChunkSet.IsEmpty())
{
@@ -127,7 +123,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
CbObjectWriter ResponseWriter;
ResponseWriter.BeginArray("need");
- ChunkSet.IterateChunks([&](const IoHash& Hash) {
+ ChunkSet.IterateHashes([&](const IoHash& Hash) {
ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
ResponseWriter.AddHash(Hash);
@@ -159,25 +155,18 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
{
ZEN_ASSERT(Attachment.IsCompressedBinary());
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
- SharedBuffer Decompressed = DataView.Decompress();
- const uint64_t DecompressedSize = DataView.GetRawSize();
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
ZEN_UNUSED(DataHash);
-
- TotalAttachmentBytes += DecompressedSize;
+ TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;
- // Note that we store executables uncompressed to make it
- // more straightforward and efficient to materialize them
-
- const CasStore::InsertResult InsertResult =
- m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash()));
+ const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer);
if (InsertResult.New)
{
- TotalNewBytes += DecompressedSize;
+ TotalNewBytes += Buffer.GetCompressedSize();
++NewAttachmentCount;
}
}
diff --git a/zenserver/compute/function.h b/zenserver/compute/function.h
index efabe96ee..650cee757 100644
--- a/zenserver/compute/function.h
+++ b/zenserver/compute/function.h
@@ -20,7 +20,6 @@
namespace zen {
-class CasStore;
class CidStore;
class UpstreamApply;
class CloudCacheClient;
@@ -35,8 +34,7 @@ struct CloudCacheClientOptions;
class HttpFunctionService : public HttpService
{
public:
- HttpFunctionService(CasStore& Store,
- CidStore& InCidStore,
+ HttpFunctionService(CidStore& InCidStore,
const CloudCacheClientOptions& ComputeOptions,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& ComputeAuthConfig,
@@ -52,7 +50,6 @@ private:
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
HttpRequestRouter m_Router;
- CasStore& m_CasStore;
CidStore& m_CidStore;
std::unique_ptr<UpstreamApply> m_UpstreamApply;
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 1853941ed..e42704ccf 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -16,8 +16,8 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
+#include <zenstore/scrubcontext.h>
#include "config.h"
@@ -350,7 +350,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
Hashes.push_back(Kv.second);
}
- GcCtx.ContributeCids(Hashes);
+ GcCtx.AddRetainedCids(Hashes);
Hashes.clear();
@@ -359,7 +359,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
Hashes.push_back(Kv.second);
}
- GcCtx.ContributeCids(Hashes);
+ GcCtx.AddRetainedCids(Hashes);
}
bool
@@ -872,7 +872,7 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx)
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
: GcContributor(Gc)
, m_Log(zen::logging::Get("project"))
, m_CidStore(Store)
@@ -1482,7 +1482,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
const IoHash FileHash = Entry.AsHash();
- if (!m_CidStore.FindChunkByCid(FileHash))
+ if (!m_CidStore.ContainsChunk(FileHash))
{
ZEN_DEBUG("prep - NEED: {}", FileHash);
diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h
index 6a8730ee2..a15556cfa 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore.h
@@ -6,8 +6,6 @@
#include <zencore/uid.h>
#include <zencore/xxhash.h>
#include <zenhttp/httpserver.h>
-#include <zenstore/cas.h>
-#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
@@ -65,7 +63,7 @@ class ProjectStore : public RefCounted, public GcContributor
struct OplogStorage;
public:
- ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc);
+ ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc);
~ProjectStore();
struct Project;
diff --git a/zenserver/testing/launch.cpp b/zenserver/testing/launch.cpp
index 1236e6adb..0e46fff94 100644
--- a/zenserver/testing/launch.cpp
+++ b/zenserver/testing/launch.cpp
@@ -6,13 +6,14 @@
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
+# include <zencore/compress.h>
# include <zencore/filesystem.h>
# include <zencore/fmtutils.h>
# include <zencore/iobuffer.h>
# include <zencore/iohash.h>
# include <zencore/logging.h>
# include <zencore/windows.h>
-# include <zenstore/cas.h>
+# include <zenstore/cidstore.h>
ZEN_THIRD_PARTY_INCLUDES_START
# include <AccCtrl.h>
@@ -322,9 +323,9 @@ SandboxedJob::SpawnJob(std::filesystem::path ExePath)
////////////////////////////////////////////////////////////////////////////////
-HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::path& SandboxBaseDir)
+HttpLaunchService::HttpLaunchService(CidStore& Store, const std::filesystem::path& SandboxBaseDir)
: m_Log(logging::Get("exec"))
-, m_CasStore(Store)
+, m_CidStore(Store)
, m_SandboxPath(SandboxBaseDir)
{
m_Router.AddPattern("job", "([[:digit:]]+)");
@@ -402,7 +403,7 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat
const IoHash FileHash = Ob["hash"sv].AsHash();
- if (!m_CasStore.FindChunk(FileHash))
+ if (!m_CidStore.FindChunkByCid(FileHash))
{
ZEN_DEBUG("NEED: {} {} {}", FileHash, Ob["file"sv].AsString(), Ob["size"sv].AsUInt64());
@@ -465,7 +466,7 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat
const IoHash FileHash = Ob["hash"sv].AsHash();
uint64_t FileSize = Ob["size"sv].AsUInt64();
- if (IoBuffer Chunk = m_CasStore.FindChunk(FileHash); !Chunk)
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(FileHash); !Chunk)
{
ZEN_DEBUG("MISSING: {} {} {}", FileHash, FileName, FileSize);
AllOk = false;
@@ -476,9 +477,18 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat
{
std::filesystem::path FullPath = SandboxDir / FileName;
- const IoBuffer* Chunks[] = {&Chunk};
-
- zen::WriteFile(FullPath, Chunks, 1);
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ CompositeBuffer CompositeBuffer = Compressed.DecompressToComposite();
+ std::span<const SharedBuffer> Segments = CompositeBuffer.GetSegments();
+ std::vector<IoBuffer> Chunks(Segments.size());
+ std::vector<IoBuffer*> ChunkPtrs(Segments.size());
+ for (size_t Index = 0; Index < Segments.size(); ++Index)
+ {
+ Chunks[Index] = std::move(Segments[Index].AsIoBuffer());
+ ChunkPtrs[Index] = &Chunks[Index];
+ }
+
+ zen::WriteFile(FullPath, ChunkPtrs.data(), ChunkPtrs.size());
}
}
diff --git a/zenserver/testing/launch.h b/zenserver/testing/launch.h
index 6fd3e39ae..f44618bfb 100644
--- a/zenserver/testing/launch.h
+++ b/zenserver/testing/launch.h
@@ -17,7 +17,7 @@
namespace zen {
-class CasStore;
+class CidStore;
/**
* Process launcher for test executables
@@ -25,7 +25,7 @@ class CasStore;
class HttpLaunchService : public HttpService
{
public:
- HttpLaunchService(CasStore& Store, const std::filesystem::path& SandboxBaseDir);
+ HttpLaunchService(CidStore& Store, const std::filesystem::path& SandboxBaseDir);
~HttpLaunchService();
virtual const char* BaseUri() const override;
@@ -36,7 +36,7 @@ private:
spdlog::logger& m_Log;
HttpRequestRouter m_Router;
- CasStore& m_CasStore;
+ CidStore& m_CidStore;
std::filesystem::path m_SandboxPath;
std::atomic<int> m_SandboxCount{0};
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index 38c798569..22b06d9c4 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -17,7 +17,6 @@
# include <zencore/timer.h>
# include <zencore/workthreadpool.h>
-# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
# include <auth/authmgr.h>
@@ -49,11 +48,9 @@ namespace detail {
const UpstreamAuthConfig& ComputeAuthConfig,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
CidStore& CidStore,
AuthMgr& Mgr)
: m_Log(logging::Get("upstream-apply"))
- , m_CasStore(CasStore)
, m_CidStore(CidStore)
, m_AuthMgr(Mgr)
{
@@ -341,7 +338,7 @@ namespace detail {
}
else
{
- DataBuffer = m_CasStore.FindChunk(Hash);
+ DataBuffer = m_CidStore.FindChunkByCid(Hash);
if (!DataBuffer)
{
Log().warn("Put blob FAILED, input chunk '{}' missing", Hash);
@@ -676,7 +673,6 @@ namespace detail {
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- CasStore& m_CasStore;
CidStore& m_CidStore;
AuthMgr& m_AuthMgr;
std::string m_DisplayName;
@@ -1218,7 +1214,7 @@ namespace detail {
const IoHash ChunkId = FileEntry["hash"sv].AsHash();
const uint64_t Size = FileEntry["size"sv].AsUInt64();
- if (!m_CasStore.ContainsChunk(ChunkId))
+ if (!m_CidStore.ContainsChunk(ChunkId))
{
Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
return false;
@@ -1443,7 +1439,6 @@ UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& Comput
const UpstreamAuthConfig& ComputeAuthConfig,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
CidStore& CidStore,
AuthMgr& Mgr)
{
@@ -1451,11 +1446,10 @@ UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& Comput
ComputeAuthConfig,
StorageOptions,
StorageAuthConfig,
- CasStore,
CidStore,
Mgr);
}
} // namespace zen
-#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index c397bb141..c719b225d 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -11,7 +11,6 @@
# include <zencore/timer.h>
# include <zencore/workthreadpool.h>
-# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
# include "diag/logging.h"
@@ -77,10 +76,9 @@ struct UpstreamApplyStats
class UpstreamApplyImpl final : public UpstreamApply
{
public:
- UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+ UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore)
: m_Log(logging::Get("upstream-apply"))
, m_Options(Options)
- , m_CasStore(CasStore)
, m_CidStore(CidStore)
, m_Stats(Options.StatsEnabled)
, m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount)
@@ -429,7 +427,6 @@ private:
spdlog::logger& m_Log;
UpstreamApplyOptions m_Options;
- CasStore& m_CasStore;
CidStore& m_CidStore;
UpstreamApplyStats m_Stats;
UpstreamApplyTasks m_ApplyTasks;
@@ -452,9 +449,9 @@ UpstreamApply::IsHealthy() const
}
std::unique_ptr<UpstreamApply>
-UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore)
{
- return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore);
+ return std::make_unique<UpstreamApplyImpl>(Options, CidStore);
}
} // namespace zen
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
index 1deaf00a5..4a095be6c 100644
--- a/zenserver/upstream/upstreamapply.h
+++ b/zenserver/upstream/upstreamapply.h
@@ -18,7 +18,6 @@
namespace zen {
class AuthMgr;
-class CasStore;
class CbObjectWriter;
class CidStore;
class CloudCacheTokenProvider;
@@ -153,7 +152,6 @@ public:
const UpstreamAuthConfig& ComputeAuthConfig,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
CidStore& CidStore,
AuthMgr& Mgr);
};
@@ -186,7 +184,7 @@ public:
virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
- static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
+ static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CidStore& CidStore);
};
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 98b4439c7..7d1f72004 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -15,7 +15,6 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
#include <auth/authmgr.h>
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 801a98523..31ca8851a 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -17,8 +17,8 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/websocket.h>
#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
#include <zenutil/zenserverprocess.h>
#if ZEN_PLATFORM_WINDOWS
@@ -56,7 +56,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
//////////////////////////////////////////////////////////////////////////
-#include "casstore.h"
#include "config.h"
#include "diag/logging.h"
@@ -103,6 +102,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "auth/authservice.h"
#include "cache/structuredcache.h"
#include "cache/structuredcachestore.h"
+#include "cidstore.h"
#include "compute/function.h"
#include "diag/diagsvcs.h"
#include "experimental/usnjournal.h"
@@ -253,17 +253,16 @@ public:
ZEN_INFO("initializing storage");
- zen::CasStoreConfiguration Config;
+ zen::CidStoreConfiguration Config;
Config.RootDirectory = m_DataRoot / "cas";
- m_CasStore->Initialize(Config);
-
- m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid");
- m_CasGc.SetCidStore(m_CidStore.get());
+ m_CidStore = std::make_unique<zen::CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+ m_CidService.reset(new zen::HttpCidService{*m_CidStore});
ZEN_INFO("instantiating project service");
- m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_CasGc);
+ m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager);
m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore});
#if ZEN_WITH_EXEC_SERVICES
@@ -274,7 +273,7 @@ public:
std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
zen::CreateDirectories(SandboxDir);
- m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
+ m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CidStore, SandboxDir);
}
else
{
@@ -328,7 +327,7 @@ public:
m_Http->RegisterService(*m_HttpProjectService);
}
- m_Http->RegisterService(m_CasService);
+ m_Http->RegisterService(*m_CidService);
#if ZEN_WITH_EXEC_SERVICES
if (ServerOptions.ExecServiceEnabled)
@@ -522,7 +521,6 @@ public:
ZEN_INFO("Storage validation STARTING");
ScrubContext Ctx;
- m_CasStore->Scrub(Ctx);
m_CidStore->Scrub(Ctx);
m_ProjectStore->Scrub(Ctx);
m_StructuredCacheService->Scrub(Ctx);
@@ -538,9 +536,6 @@ public:
void Flush()
{
- if (m_CasStore)
- m_CasStore->Flush();
-
if (m_CidStore)
m_CidStore->Flush();
@@ -602,14 +597,13 @@ private:
std::unique_ptr<zen::HttpAuthService> m_AuthService;
zen::HttpStatusService m_StatusService;
zen::HttpStatsService m_StatsService;
- zen::CasGc m_CasGc;
- zen::GcScheduler m_GcScheduler{m_CasGc};
- std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_CasGc)};
+ zen::GcManager m_GcManager;
+ zen::GcScheduler m_GcScheduler{m_GcManager};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
zen::HttpTestService m_TestService;
zen::HttpTestingService m_TestingService;
- zen::HttpCasService m_CasService{*m_CasStore};
+ std::unique_ptr<zen::HttpCidService> m_CidService;
zen::RefPtr<zen::ProjectStore> m_ProjectStore;
std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
std::unique_ptr<zen::UpstreamCache> m_UpstreamCache;
@@ -745,7 +739,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
ZEN_INFO("instantiating structured cache service");
m_CacheStore = std::make_unique<ZenCacheStore>(
- m_CasGc,
+ m_GcManager,
ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true});
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
@@ -873,8 +867,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
.OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider,
.AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken};
- m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore,
- *m_CidStore,
+ m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CidStore,
ComputeOptions,
StorageOptions,
ComputeAuthConfig,
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
index 0e1d5b242..54e8cb11c 100644
--- a/zenstore/cas.cpp
+++ b/zenstore/cas.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenstore/cas.h>
+#include "cas.h"
#include "compactcas.h"
#include "filecas.h"
@@ -18,7 +18,9 @@
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zencore/uid.h>
+#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
#include <gsl/gsl-lite.hpp>
@@ -30,58 +32,6 @@
namespace zen {
-void
-CasChunkSet::AddChunkToSet(const IoHash& HashToAdd)
-{
- m_ChunkSet.insert(HashToAdd);
-}
-
-void
-CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd)
-{
- m_ChunkSet.insert(HashesToAdd.begin(), HashesToAdd.end());
-}
-
-void
-CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
-{
- for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
- {
- if (Predicate(*It))
- {
- It = m_ChunkSet.erase(It);
- }
- else
- {
- ++It;
- }
- }
-}
-
-void
-CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback)
-{
- for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It)
- {
- Callback(*It);
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-void
-ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
-{
- m_BadCas.AddChunksToSet(BadCasChunks);
-}
-
-void
-ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
-{
- m_ChunkCount.fetch_add(ChunkCount);
- m_ByteCount.fetch_add(ChunkBytes);
-}
-
/**
* CAS store implementation
*
@@ -93,18 +43,18 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
class CasImpl : public CasStore
{
public:
- CasImpl(CasGc& Gc);
+ CasImpl(GcManager& Gc);
virtual ~CasImpl();
- virtual void Initialize(const CasStoreConfiguration& InConfig) override;
+ virtual void Initialize(const CidStoreConfiguration& InConfig) override;
virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override;
virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
virtual bool ContainsChunk(const IoHash& ChunkHash) override;
- virtual void FilterChunks(CasChunkSet& InOutChunks) override;
+ virtual void FilterChunks(HashKeySet& InOutChunks) override;
virtual void Flush() override;
virtual void Scrub(ScrubContext& Ctx) override;
virtual void GarbageCollect(GcContext& GcCtx) override;
- virtual CasStoreSize TotalSize() const override;
+ virtual CidStoreSize TotalSize() const override;
private:
CasContainerStrategy m_TinyStrategy;
@@ -124,7 +74,7 @@ private:
void UpdateManifest();
};
-CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config, Gc), m_SmallStrategy(m_Config, Gc), m_LargeStrategy(m_Config, Gc)
+CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc)
{
}
@@ -133,7 +83,7 @@ CasImpl::~CasImpl()
}
void
-CasImpl::Initialize(const CasStoreConfiguration& InConfig)
+CasImpl::Initialize(const CidStoreConfiguration& InConfig)
{
m_Config = InConfig;
@@ -149,9 +99,9 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
// Initialize payload storage
- m_LargeStrategy.Initialize(IsNewStore);
- m_TinyStrategy.Initialize("tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
- m_SmallStrategy.Initialize("sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
+ m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore);
+ m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
+ m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
}
bool
@@ -292,7 +242,7 @@ CasImpl::ContainsChunk(const IoHash& ChunkHash)
}
void
-CasImpl::FilterChunks(CasChunkSet& InOutChunks)
+CasImpl::FilterChunks(HashKeySet& InOutChunks)
{
m_SmallStrategy.FilterChunks(InOutChunks);
m_TinyStrategy.FilterChunks(InOutChunks);
@@ -330,7 +280,7 @@ CasImpl::GarbageCollect(GcContext& GcCtx)
m_LargeStrategy.CollectGarbage(GcCtx);
}
-CasStoreSize
+CidStoreSize
CasImpl::TotalSize() const
{
const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize;
@@ -343,7 +293,7 @@ CasImpl::TotalSize() const
//////////////////////////////////////////////////////////////////////////
std::unique_ptr<CasStore>
-CreateCasStore(CasGc& Gc)
+CreateCasStore(GcManager& Gc)
{
return std::make_unique<CasImpl>(Gc);
}
@@ -359,10 +309,10 @@ TEST_CASE("CasStore")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration config;
+ CidStoreConfiguration config;
config.RootDirectory = TempDir.Path();
- CasGc Gc;
+ GcManager Gc;
std::unique_ptr<CasStore> Store = CreateCasStore(Gc);
Store->Initialize(config);
@@ -382,9 +332,9 @@ TEST_CASE("CasStore")
CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2);
CHECK(Result2.New);
- CasChunkSet ChunkSet;
- ChunkSet.AddChunkToSet(Hash1);
- ChunkSet.AddChunkToSet(Hash2);
+ HashKeySet ChunkSet;
+ ChunkSet.AddHashToSet(Hash1);
+ ChunkSet.AddHashToSet(Hash2);
Store->FilterChunks(ChunkSet);
CHECK(ChunkSet.IsEmpty());
diff --git a/zenstore/cas.h b/zenstore/cas.h
new file mode 100644
index 000000000..2ad160d28
--- /dev/null
+++ b/zenstore/cas.h
@@ -0,0 +1,61 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/blake3.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/refcount.h>
+#include <zencore/timer.h>
+#include <zenstore/cidstore.h>
+#include <zenstore/hashkeyset.h>
+
+#include <atomic>
+#include <filesystem>
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+namespace zen {
+
+class GcContext;
+class GcManager;
+class ScrubContext;
+
+/** Content Addressable Storage interface
+
+ */
+
+class CasStore
+{
+public:
+ virtual ~CasStore() = default;
+
+ const CidStoreConfiguration& Config() { return m_Config; }
+
+ struct InsertResult
+ {
+ bool New = false;
+ };
+
+ virtual void Initialize(const CidStoreConfiguration& Config) = 0;
+ virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+ virtual void Flush() = 0;
+ virtual void Scrub(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
+ virtual CidStoreSize TotalSize() const = 0;
+
+protected:
+ CidStoreConfiguration m_Config;
+ uint64_t m_LastScrubTime = 0;
+};
+
+ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
+
+void CAS_forcelink();
+
+} // namespace zen
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index 03a56f010..9c5258bce 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
#include "compactcas.h"
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 01eda4697..5a079dbed 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -7,8 +7,9 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/string.h>
-#include <zenstore/cas.h>
-#include <zenstore/caslog.h>
+#include <zenstore/scrubcontext.h>
+
+#include "cas.h"
#include <filesystem>
@@ -18,153 +19,32 @@ struct CidStore::Impl
{
Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {}
- struct IndexEntry
- {
- IoHash Uncompressed;
- IoHash Compressed;
- };
-
- CasStore& m_CasStore;
- TCasLogFile<IndexEntry> m_LogFile;
+ CasStore& m_CasStore;
- RwLock m_Lock;
- tsl::robin_map<IoHash, IoHash> m_CidMap;
+ void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); }
- CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData)
+ CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData)
{
const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash());
IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
- IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
Payload.SetContentType(ZenContentType::kCompressedBinary);
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash);
- AddCompressedCid(DecompressedId, CompressedHash);
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId);
- return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New};
+ return {.New = Result.New};
}
- void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
- {
- ZEN_ASSERT(Compressed != IoHash::Zero);
-
- RwLock::ExclusiveLockScope _(m_Lock);
+ IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); }
- auto It = m_CidMap.try_emplace(DecompressedId, Compressed);
- if (!It.second)
- {
- if (It.first.value() != Compressed)
- {
- It.first.value() = Compressed;
- }
- else
- {
- // No point logging an update that won't change anything
- return;
- }
- }
+ bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); }
- // It's not ideal to do this while holding the lock in case
- // we end up in blocking I/O but that's for later
- LogMapping(DecompressedId, Compressed);
- }
-
- void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash)
+ void FilterChunks(HashKeySet& InOutChunks)
{
- ZEN_ASSERT(DecompressedId != CompressedHash);
- m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash});
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); });
}
- IoHash RemapCid(const IoHash& DecompressedId)
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- return It->second;
- }
-
- return IoHash::Zero;
- }
-
- IoBuffer FindChunkByCid(const IoHash& DecompressedId)
- {
- IoHash CompressedHash;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- CompressedHash = It->second;
- }
- else
- {
- return {};
- }
- }
-
- ZEN_ASSERT(CompressedHash != IoHash::Zero);
-
- return m_CasStore.FindChunk(CompressedHash);
- }
-
- bool ContainsChunk(const IoHash& DecompressedId)
- {
- IoHash CasHash = IoHash::Zero;
-
- {
- RwLock::SharedLockScope _(m_Lock);
- if (const auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
- {
- CasHash = It->second;
- }
- }
-
- return CasHash != IoHash::Zero ? m_CasStore.ContainsChunk(CasHash) : false;
- }
-
- void InitializeIndex(const std::filesystem::path& RootDir)
- {
- CreateDirectories(RootDir);
- std::filesystem::path SlogPath{RootDir / "cid.slog"};
-
- bool IsNew = !std::filesystem::exists(SlogPath);
-
- m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
-
- ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize()));
-
- uint64_t TombstoneCount = 0;
- uint64_t InvalidCount = 0;
-
- m_LogFile.Replay(
- [&](const IndexEntry& Entry) {
- if (Entry.Compressed != IoHash::Zero)
- {
- // Update
- m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
- }
- else
- {
- if (Entry.Uncompressed != IoHash::Zero)
- {
- // Tombstone
- m_CidMap.erase(Entry.Uncompressed);
- ++TombstoneCount;
- }
- else
- {
- // Completely uninitialized entry with both hashes set to zero indicates a
- // problem. Might be an unwritten page due to BSOD or some other problem
- ++InvalidCount;
- }
- }
- },
- 0);
-
- ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount);
- }
-
- void Flush() { m_LogFile.Flush(); }
+ void Flush() { m_CasStore.Flush(); }
void Scrub(ScrubContext& Ctx)
{
@@ -175,83 +55,7 @@ struct CidStore::Impl
m_LastScrubTime = Ctx.ScrubTimestamp();
- CasChunkSet ChunkSet;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto& Kv : m_CidMap)
- {
- ChunkSet.AddChunkToSet(Kv.second);
- }
- }
-
- m_CasStore.FilterChunks(ChunkSet);
-
- if (ChunkSet.IsEmpty())
- {
- // All good - we have all the chunks
- return;
- }
-
- ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed",
- ChunkSet.GetSize(),
- m_CidMap.size());
-
- // Erase all mappings to chunks which are not present in the underlying CAS store
- // we do this by removing mappings from the in-memory lookup structure and also
- // by emitting tombstone records to the commit log
-
- std::vector<IoHash> BadChunks;
-
- {
- RwLock::SharedLockScope _(m_Lock);
-
- for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;)
- {
- if (ChunkSet.ContainsChunk(It->second))
- {
- const IoHash& BadHash = It->first;
-
- // Log a tombstone record
- LogMapping(BadHash, IoHash::Zero);
-
- BadChunks.push_back(BadHash);
-
- It = m_CidMap.erase(It);
- }
- else
- {
- ++It;
- }
- }
- }
-
- m_LogFile.Flush();
-
- // TODO: Should compute a snapshot index here
-
- Ctx.ReportBadCasChunks(BadChunks);
- }
-
- void RemoveCids(CasChunkSet& CasChunks)
- {
- std::vector<IndexEntry> RemovedEntries;
- RemovedEntries.reserve(CasChunks.GetSize());
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;)
- {
- if (CasChunks.ContainsChunk(It->second))
- {
- RemovedEntries.push_back({It->first, IoHash::Zero});
- It = m_CidMap.erase(It);
- continue;
- }
- ++It;
- }
- }
- m_LogFile.Append(RemovedEntries);
+ m_CasStore.Scrub(Ctx);
}
uint64_t m_LastScrubTime = 0;
@@ -259,25 +63,24 @@ struct CidStore::Impl
//////////////////////////////////////////////////////////////////////////
-CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore))
+CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore))
{
- m_Impl->InitializeIndex(RootDir);
}
CidStore::~CidStore()
{
}
-CidStore::InsertResult
-CidStore::AddChunk(CompressedBuffer& ChunkData)
+void
+CidStore::Initialize(const CidStoreConfiguration& Config)
{
- return m_Impl->AddChunk(ChunkData);
+ m_Impl->Initialize(Config);
}
-void
-CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
+CidStore::InsertResult
+CidStore::AddChunk(const CompressedBuffer& ChunkData)
{
- m_Impl->AddCompressedCid(DecompressedId, Compressed);
+ return m_Impl->AddChunk(ChunkData);
}
IoBuffer
@@ -286,12 +89,6 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId)
return m_Impl->FindChunkByCid(DecompressedId);
}
-IoHash
-CidStore::RemapCid(const IoHash& DecompressedId)
-{
- return m_Impl->RemapCid(DecompressedId);
-}
-
bool
CidStore::ContainsChunk(const IoHash& DecompressedId)
{
@@ -299,25 +96,25 @@ CidStore::ContainsChunk(const IoHash& DecompressedId)
}
void
-CidStore::Flush()
+CidStore::FilterChunks(HashKeySet& InOutChunks)
{
- m_Impl->Flush();
+ return m_Impl->FilterChunks(InOutChunks);
}
void
-CidStore::Scrub(ScrubContext& Ctx)
+CidStore::Flush()
{
- m_Impl->Scrub(Ctx);
+ m_Impl->Flush();
}
void
-CidStore::RemoveCids(CasChunkSet& CasChunks)
+CidStore::Scrub(ScrubContext& Ctx)
{
- m_Impl->RemoveCids(CasChunks);
+ m_Impl->Scrub(Ctx);
}
-CasStoreSize
-CidStore::CasSize() const
+CidStoreSize
+CidStore::TotalSize() const
{
return m_Impl->m_CasStore.TotalSize();
}
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 5aed02e7f..a7fdfa1f5 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -2,13 +2,16 @@
#include "compactcas.h"
-#include <zenstore/cas.h>
+#include "cas.h"
+#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zenstore/scrubcontext.h>
+
#include <gsl/gsl-lite.hpp>
#include <xxhash.h>
@@ -76,94 +79,6 @@ namespace {
return GetBasePath(RootPath, ContainerBaseName) / "blocks";
}
- std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / (ContainerBaseName + LogExtension);
- }
-
- std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / (ContainerBaseName + ".ucas");
- }
-
- std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
- {
- return RootPath / (ContainerBaseName + IndexExtension);
- }
-
- struct LegacyCasDiskLocation
- {
- LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize)
- {
- ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
- ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
-
- memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
- memcpy(&m_Size[0], &InSize, sizeof m_Size);
- }
-
- LegacyCasDiskLocation() = default;
-
- inline uint64_t GetOffset() const
- {
- uint64_t Offset = 0;
- memcpy(&Offset, &m_Offset, sizeof m_Offset);
- return Offset;
- }
-
- inline uint64_t GetSize() const
- {
- uint64_t Size = 0;
- memcpy(&Size, &m_Size, sizeof m_Size);
- return Size;
- }
-
- private:
- uint8_t m_Offset[5];
- uint8_t m_Size[5];
- };
-
- struct LegacyCasDiskIndexEntry
- {
- static const uint8_t kTombstone = 0x01;
-
- IoHash Key;
- LegacyCasDiskLocation Location;
- ZenContentType ContentType = ZenContentType::kUnknownContentType;
- uint8_t Flags = 0;
- };
-
- bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0)
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone)
- {
- return true;
- }
- if (Entry.ContentType != ZenContentType::kUnknownContentType)
- {
- OutReason =
- fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
- return false;
- }
- uint64_t Size = Entry.Location.GetSize();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -199,10 +114,7 @@ namespace {
//////////////////////////////////////////////////////////////////////////
-CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
-: GcStorage(Gc)
-, m_Config(Config)
-, m_Log(logging::Get("containercas"))
+CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas"))
{
}
@@ -211,16 +123,21 @@ CasContainerStrategy::~CasContainerStrategy()
}
void
-CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore)
+CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint64_t Alignment,
+ bool IsNewStore)
{
ZEN_ASSERT(IsPow2(Alignment));
ZEN_ASSERT(!m_IsInitialized);
ZEN_ASSERT(MaxBlockSize > 0);
+ m_RootDirectory = RootDirectory;
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
m_MaxBlockSize = MaxBlockSize;
- m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+ m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName);
OpenContainer(IsNewStore);
@@ -267,6 +184,9 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
CasStore::InsertResult
CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
}
@@ -293,7 +213,7 @@ CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
}
void
-CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
+CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
{
// This implementation is good enough for relatively small
// chunk sets (in terms of chunk identifiers), but would
@@ -302,7 +222,7 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
// we're likely to already have a large proportion of the
// chunks in the set
- InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
void
@@ -316,6 +236,7 @@ void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
std::vector<IoHash> BadKeys;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
@@ -337,6 +258,9 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
if (!Data)
{
@@ -344,66 +268,97 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
BadKeys.push_back(Hash);
return;
}
- const IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
- if (ComputedHash != Hash)
+
+ IoBuffer Buffer(IoBuffer::Wrap, Data, Size);
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed)
+ {
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
+ IoHash ComputedHash = IoHash::HashBuffer(Data, Size);
+ if (ComputedHash == Hash)
{
- // Hash mismatch
- BadKeys.push_back(Hash);
return;
}
+#endif
+ BadKeys.push_back(Hash);
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
+
+ const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ // TODO: Add API to verify compressed buffer without having to memorymap the whole file
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed)
+ {
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash)
+ {
+ // Hash mismatch
+ BadKeys.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
IoHashStream Hasher;
- File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
- IoHash ComputedHash = Hasher.GetHash();
- const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- if (ComputedHash != Hash)
+ File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+ if (ComputedHash == Hash)
{
- // Hash mismatch
- BadKeys.push_back(Hash);
return;
}
+#endif
+ BadKeys.push_back(Hash);
};
m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk);
_.ReleaseNow();
- if (BadKeys.empty())
- {
- return;
- }
-
- ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName);
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- if (Ctx.RunRecovery())
+ if (!BadKeys.empty())
{
- // Deal with bad chunks by removing them from our lookup map
+ ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
+ if (Ctx.RunRecovery())
{
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
- for (const IoHash& ChunkHash : BadKeys)
+ // Deal with bad chunks by removing them from our lookup map
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
{
- const auto KeyIt = m_LocationMap.find(ChunkHash);
- if (KeyIt == m_LocationMap.end())
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ for (const IoHash& ChunkHash : BadKeys)
{
- // Might have been GC'd
- continue;
+ const auto KeyIt = m_LocationMap.find(ChunkHash);
+ if (KeyIt == m_LocationMap.end())
+ {
+ // Might have been GC'd
+ continue;
+ }
+ LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
+ m_LocationMap.erase(KeyIt);
}
- LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone});
- m_LocationMap.erase(KeyIt);
}
+ m_CasLog.Append(LogEntries);
}
- m_CasLog.Append(LogEntries);
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCasChunks(BadKeys);
+ Ctx.ReportBadCidChunks(BadKeys);
+
+ ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -432,7 +387,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// do a blocking operation and update the m_LocationMap after each new block is
// written and figuring out the path to the next new block.
- ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
uint64_t WriteBlockTimeUs = 0;
uint64_t WriteBlockLongestTimeUs = 0;
@@ -468,7 +423,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
ChunkLocations.reserve(TotalChunkCount);
ChunkIndexToChunkHash.reserve(TotalChunkCount);
- GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
auto KeyIt = LocationMap.find(ChunkHash);
const BlockStoreDiskLocation& DiskLocation = KeyIt->second;
BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
@@ -539,26 +494,26 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
},
[&GcCtx]() { return GcCtx.CollectSmallObjects(); });
- GcCtx.DeletedCas(DeletedChunks);
+ GcCtx.AddDeletedCids(DeletedChunks);
}
void
CasContainerStrategy::MakeIndexSnapshot()
{
- ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ ZEN_INFO("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName);
uint64_t EntryCount = 0;
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
+ m_RootDirectory / m_ContainerBaseName,
EntryCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
namespace fs = std::filesystem;
- fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
+ fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName);
// Move index away, we keep it if something goes wrong
if (fs::is_regular_file(TempIndexPath))
@@ -629,13 +584,13 @@ uint64_t
CasContainerStrategy::ReadIndexFile()
{
std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName);
if (std::filesystem::is_regular_file(IndexPath))
{
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' index containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
+ m_RootDirectory / m_ContainerBaseName,
Entries.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
@@ -682,13 +637,13 @@ uint64_t
CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
{
std::vector<CasDiskIndexEntry> Entries;
- std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
if (std::filesystem::is_regular_file(LogPath))
{
Stopwatch Timer;
const auto _ = MakeGuard([&] {
ZEN_INFO("read store '{}' log containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
+ m_RootDirectory / m_ContainerBaseName,
Entries.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
@@ -727,208 +682,6 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
return 0;
}
-uint64_t
-CasContainerStrategy::MigrateLegacyData(bool CleanSource)
-{
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
-
- if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
- {
- return 0;
- }
-
- ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName);
-
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
- std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
-
- uint64_t MigratedChunkCount = 0;
- uint32_t MigratedBlockCount = 0;
- Stopwatch MigrationTimer;
- uint64_t TotalSize = 0;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
- m_Config.RootDirectory / m_ContainerBaseName,
- MigratedChunkCount,
- MigratedBlockCount,
- NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
- NiceBytes(TotalSize));
- });
-
- uint64_t BlockFileSize = 0;
- {
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- BlockFileSize = BlockFile.FileSize();
- }
-
- std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
- uint64_t InvalidEntryCount = 0;
-
- TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
- LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
- {
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- LegacyDiskIndex.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- if (LegacyCasLog.Initialize())
- {
- LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay(
- [&](const LegacyCasDiskIndexEntry& Record) {
- std::string InvalidEntryReason;
- if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- InvalidEntryCount++;
- return;
- }
- LegacyDiskIndex.insert_or_assign(Record.Key, Record);
- },
- 0);
-
- std::vector<IoHash> BadEntries;
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskIndexEntry& Record(Entry.second);
- if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize)
- {
- continue;
- }
- ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
- BadEntries.push_back(Entry.first);
- }
- for (const IoHash& BadHash : BadEntries)
- {
- LegacyDiskIndex.erase(BadHash);
- }
- InvalidEntryCount += BadEntries.size();
- }
- }
-
- if (InvalidEntryCount)
- {
- ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName);
- }
-
- if (LegacyDiskIndex.empty())
- {
- LegacyCasLog.Close();
- if (CleanSource)
- {
- // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
- // a CAS manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySidx;
- LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
- }
- return 0;
- }
-
- std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- CreateDirectories(LogPath.parent_path());
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
- std::vector<BlockStoreLocation> ChunkLocations;
- ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size());
- ChunkLocations.reserve(LegacyDiskIndex.size());
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyCasDiskLocation& Location = Entry.second.Location;
- const IoHash& ChunkHash = Entry.first;
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()});
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- TotalSize += Location.GetSize();
- }
- m_BlockStore.Split(
- ChunkLocations,
- LegacyDataPath,
- m_BlocksBasePath,
- m_MaxBlockSize,
- BlockStoreDiskLocation::MaxBlockIndex + 1,
- m_PayloadAlignment,
- CleanSource,
- [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
- const BlockStore::MovedChunksArray& MovedChunks) {
- std::vector<CasDiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- LogEntries.push_back({.Key = ChunkHash,
- .Location = {NewLocation, m_PayloadAlignment},
- .ContentType = OldEntry.ContentType,
- .Flags = OldEntry.Flags});
- }
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
- }
- CasLog.Append(LogEntries);
- CasLog.Flush();
- if (CleanSource)
- {
- std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- LegacyLogEntries.push_back(
- LegacyCasDiskIndexEntry{.Key = ChunkHash,
- .Location = OldEntry.Location,
- .ContentType = OldEntry.ContentType,
- .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)});
- }
- LegacyCasLog.Append(LegacyLogEntries);
- LegacyCasLog.Flush();
- }
- MigratedBlockCount++;
- MigratedChunkCount += MovedChunks.size();
- });
-
- LegacyCasLog.Close();
- CasLog.Close();
-
- if (CleanSource)
- {
- // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
- // a CAS manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySidx;
- LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
- }
- return MigratedChunkCount;
-}
-
void
CasContainerStrategy::OpenContainer(bool IsNewStore)
{
@@ -937,25 +690,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_LocationMap.clear();
- std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName);
if (IsNewStore)
{
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
-
- std::filesystem::remove(LegacyLogPath);
- std::filesystem::remove(LegacyDataPath);
std::filesystem::remove_all(BasePath);
}
- uint64_t LogPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(LogPosition);
- uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
CreateDirectories(BasePath);
- std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName);
m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
std::vector<BlockStoreLocation> KnownLocations;
@@ -969,7 +716,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
- if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ if (IsNewStore || (LogEntryCount > 0))
{
MakeIndexSnapshot();
}
@@ -1040,18 +787,14 @@ TEST_CASE("compactcas.compact.gc")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
const int kIterationCount = 1000;
std::vector<IoHash> Keys(kIterationCount);
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -1083,9 +826,9 @@ TEST_CASE("compactcas.compact.gc")
// the original cas store
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -1109,18 +852,13 @@ TEST_CASE("compactcas.compact.totalsize")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
-
- CreateDirectories(CasConfig.RootDirectory);
-
const uint64_t kChunkSize = 1024;
const int32_t kChunkCount = 16;
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, true);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
@@ -1135,9 +873,9 @@ TEST_CASE("compactcas.compact.totalsize")
}
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
@@ -1145,9 +883,9 @@ TEST_CASE("compactcas.compact.totalsize")
// Re-open again, this time we should have a snapshot
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 65536, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 65536, 16, false);
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
@@ -1159,13 +897,9 @@ TEST_CASE("compactcas.gc.basic")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 65536, 1 << 4, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
IoBuffer Chunk = CreateChunk(128);
IoHash ChunkHash = IoHash::HashBuffer(Chunk);
@@ -1186,16 +920,12 @@ TEST_CASE("compactcas.gc.removefile")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
IoBuffer Chunk = CreateChunk(128);
IoHash ChunkHash = IoHash::HashBuffer(Chunk);
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 65536, 1 << 4, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true);
const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
CHECK(InsertResult.New);
@@ -1204,9 +934,9 @@ TEST_CASE("compactcas.gc.removefile")
Cas.Flush();
}
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 65536, 1 << 4, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false);
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
@@ -1222,13 +952,9 @@ TEST_CASE("compactcas.gc.compact")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("cb", 2048, 1 << 4, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true);
uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
std::vector<IoBuffer> Chunks;
@@ -1275,7 +1001,7 @@ TEST_CASE("compactcas.gc.compact")
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[0]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1308,7 +1034,7 @@ TEST_CASE("compactcas.gc.compact")
GcCtx.CollectSmallObjects(true);
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1342,7 +1068,7 @@ TEST_CASE("compactcas.gc.compact")
KeepChunks.push_back(ChunkHashes[1]);
KeepChunks.push_back(ChunkHashes[4]);
KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1377,7 +1103,7 @@ TEST_CASE("compactcas.gc.compact")
KeepChunks.push_back(ChunkHashes[6]);
KeepChunks.push_back(ChunkHashes[7]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1414,7 +1140,7 @@ TEST_CASE("compactcas.gc.compact")
KeepChunks.push_back(ChunkHashes[4]);
KeepChunks.push_back(ChunkHashes[6]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1476,13 +1202,10 @@ TEST_CASE("compactcas.gc.deleteblockonopen")
ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
}
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
{
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 1024, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
for (size_t i = 0; i < 20; i++)
{
@@ -1498,7 +1221,7 @@ TEST_CASE("compactcas.gc.deleteblockonopen")
{
KeepChunks.push_back(ChunkHashes[i]);
}
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
Cas.Flush();
Cas.CollectGarbage(GcCtx);
@@ -1513,9 +1236,9 @@ TEST_CASE("compactcas.gc.deleteblockonopen")
}
{
// Re-open
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 1024, 16, false);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, false);
for (size_t i = 0; i < 20; i += 2)
{
@@ -1545,13 +1268,9 @@ TEST_CASE("compactcas.gc.handleopeniobuffer")
ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
}
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 1024, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 1024, 16, true);
for (size_t i = 0; i < 20; i++)
{
@@ -1574,131 +1293,12 @@ TEST_CASE("compactcas.gc.handleopeniobuffer")
CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
}
-TEST_CASE("compactcas.legacyconversion")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048};
- size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
- size_t SingleBlockSize = 0;
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(ChunkCount);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(CreateChunk(Size));
- SingleBlockSize += Size;
- }
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(ChunkCount);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
- CreateDirectories(CasConfig.RootDirectory);
-
- {
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true);
-
- for (size_t i = 0; i < ChunkCount; i++)
- {
- CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
- }
-
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcContext GcCtx;
- GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepChunks);
- Cas.Flush();
- Gc.CollectGarbage(GcCtx);
- }
-
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test");
- std::filesystem::rename(BlockPath, LegacyDataPath);
-
- std::vector<CasDiskIndexEntry> LogEntries;
- std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test");
- if (std::filesystem::is_regular_file(IndexPath))
- {
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CasDiskIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
- CasDiskIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion &&
- Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
- {
- LogEntries.resize(Header.EntryCount);
- ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
- }
- }
- ObjectIndexFile.Close();
- std::filesystem::remove(IndexPath);
- }
-
- std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test");
- {
- TCasLogFile<CasDiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- LogEntries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
- }
- TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
- std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test");
- LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
-
- for (const CasDiskIndexEntry& Entry : LogEntries)
- {
- BlockStoreLocation Location = Entry.Location.Get(16);
- LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size);
- LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key,
- .Location = LegacyLocation,
- .ContentType = Entry.ContentType,
- .Flags = Entry.Flags};
- LegacyCasLog.Append(LegacyEntry);
- }
- LegacyCasLog.Close();
-
- std::filesystem::remove_all(CasConfig.RootDirectory / "test");
-
- {
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 2048, 16, false);
-
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- CHECK(Cas.HaveChunk(ChunkHashes[i]));
- CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
- }
- }
-}
-
TEST_CASE("compactcas.threadedinsert")
{
// for (uint32_t i = 0; i < 100; ++i)
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path();
-
- CreateDirectories(CasConfig.RootDirectory);
-
const uint64_t kChunkSize = 1048;
const int32_t kChunkCount = 4096;
uint64_t ExpectedSize = 0;
@@ -1724,9 +1324,9 @@ TEST_CASE("compactcas.threadedinsert")
std::atomic<size_t> WorkCompleted = 0;
WorkerThreadPool ThreadPool(4);
- CasGc Gc;
- CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 32768, 16, true);
+ GcManager Gc;
+ CasContainerStrategy Cas(Gc);
+ Cas.Initialize(TempDir.Path(), "test", 32768, 16, true);
{
for (const auto& Chunk : Chunks)
{
@@ -1838,10 +1438,10 @@ TEST_CASE("compactcas.threadedinsert")
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Cas.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
while (WorkCompleted < NewChunks.size() + Chunks.size())
@@ -1879,10 +1479,10 @@ TEST_CASE("compactcas.threadedinsert")
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Cas.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
{
WorkCompleted = 0;
@@ -1902,53 +1502,6 @@ TEST_CASE("compactcas.threadedinsert")
}
}
-TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true))
-{
- if (true)
- {
- return;
- }
- const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas";
- std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs");
- std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs");
- std::filesystem::remove_all(TobsBasePath);
- std::filesystem::remove_all(SobsBasePath);
-
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = BigDataPath;
- uint64_t TObsSize = 0;
- {
- CasGc TobsCasGc;
- CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
- TobsCas.Initialize("tobs", 1u << 28, 16, false);
- TObsSize = TobsCas.StorageSize().DiskSize;
- CHECK(TObsSize > 0);
- }
-
- uint64_t SObsSize = 0;
- {
- CasGc SobsCasGc;
- CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
- SobsCas.Initialize("sobs", 1u << 30, 4096, false);
- SObsSize = SobsCas.StorageSize().DiskSize;
- CHECK(SObsSize > 0);
- }
-
- CasGc TobsCasGc;
- CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
- TobsCas.Initialize("tobs", 1u << 28, 16, false);
- GcContext TobsGcCtx;
- TobsCas.CollectGarbage(TobsGcCtx);
- CHECK(TobsCas.StorageSize().DiskSize == TObsSize);
-
- CasGc SobsCasGc;
- CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
- SobsCas.Initialize("sobs", 1u << 30, 4096, false);
- GcContext SobsGcCtx;
- SobsCas.CollectGarbage(SobsGcCtx);
- CHECK(SobsCas.StorageSize().DiskSize == SObsSize);
-}
-
#endif
void
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 114a6a48c..2acac7ca3 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -4,10 +4,11 @@
#include <zencore/zencore.h>
#include <zenstore/blockstore.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
+#include "cas.h"
+
#include <atomic>
#include <limits>
#include <unordered_map>
@@ -47,14 +48,18 @@ static_assert(sizeof(CasDiskIndexEntry) == 32);
struct CasContainerStrategy final : public GcStorage
{
- CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
+ CasContainerStrategy(GcManager& Gc);
~CasContainerStrategy();
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(CasChunkSet& InOutChunks);
- void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore);
+ void FilterChunks(HashKeySet& InOutChunks);
+ void Initialize(const std::filesystem::path& RootDirectory,
+ const std::string_view ContainerBaseName,
+ uint32_t MaxBlockSize,
+ uint64_t Alignment,
+ bool IsNewStore);
void Flush();
void Scrub(ScrubContext& Ctx);
virtual void CollectGarbage(GcContext& GcCtx) override;
@@ -65,12 +70,11 @@ private:
void MakeIndexSnapshot();
uint64_t ReadIndexFile();
uint64_t ReadLog(uint64_t SkipEntryCount);
- uint64_t MigrateLegacyData(bool CleanSource);
void OpenContainer(bool IsNewStore);
spdlog::logger& Log() { return m_Log; }
- const CasStoreConfiguration& m_Config;
+ std::filesystem::path m_RootDirectory;
spdlog::logger& m_Log;
uint64_t m_PayloadAlignment = 1u << 4;
uint64_t m_MaxBlockSize = 1u << 28;
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index d074a906f..23e3f4cd8 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -2,6 +2,7 @@
#include "filecas.h"
+#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -16,6 +17,7 @@
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
@@ -71,10 +73,7 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
-: GcStorage(Gc)
-, m_Config(Config)
-, m_Log(logging::Get("filecas"))
+FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas"))
{
}
@@ -83,17 +82,19 @@ FileCasStrategy::~FileCasStrategy()
}
void
-FileCasStrategy::Initialize(bool IsNewStore)
+FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore)
{
m_IsInitialized = true;
- CreateDirectories(m_Config.RootDirectory);
+ m_RootDirectory = RootDirectory;
- m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
+ CreateDirectories(m_RootDirectory);
+
+ m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
Stopwatch Timer;
const auto _ = MakeGuard([&] {
- ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
+ ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
});
std::unordered_set<IoHash> FoundEntries;
@@ -127,13 +128,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+
// File-based chunks have special case handling whereby we move the file into
// place in the file store directory, thus avoiding unnecessary copying
IoBufferFileReference FileRef;
if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
{
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::ExclusiveLockScope _(LockForHash(ChunkHash));
@@ -340,7 +345,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
// See if file already exists
//
@@ -485,7 +490,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -497,7 +502,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -513,7 +518,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
void
FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
{
- ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
if (Ec)
@@ -534,7 +539,7 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
}
void
-FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
+FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
{
ZEN_ASSERT(m_IsInitialized);
@@ -546,7 +551,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
// a caller, this is something which needs to be taken into account by anyone consuming
// this functionality in any case
- InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
void
@@ -602,12 +607,12 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile&
const std::filesystem::path& RootDirectory;
std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback;
- } CasVisitor{m_Config.RootDirectory};
+ } CasVisitor{m_RootDirectory};
CasVisitor.Callback = std::move(Callback);
FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor);
+ Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
}
void
@@ -630,21 +635,34 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
{
ZEN_ASSERT(m_IsInitialized);
- std::vector<IoHash> BadHashes;
- std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+ std::vector<IoHash> BadHashes;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ ++ChunkCount;
+ ChunkBytes += Payload.FileSize();
+
+ IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize());
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed)
+ {
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash)
+ {
+ // Hash mismatch
+ BadHashes.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
IoHashStream Hasher;
- Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
+ Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); });
IoHash ComputedHash = Hasher.GetHash();
-
- if (ComputedHash != Hash)
+ if (ComputedHash == Hash)
{
- BadHashes.push_back(Hash);
+ return;
}
-
- ++ChunkCount;
- ChunkBytes.fetch_add(Payload.FileSize());
+#endif
+ BadHashes.push_back(Hash);
});
Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
@@ -670,9 +688,12 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
}
- Ctx.ReportBadCasChunks(BadHashes);
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCidChunks(BadHashes);
- ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -680,7 +701,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
ZEN_ASSERT(m_IsInitialized);
- ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory);
+ ZEN_INFO("collecting garbage from {}", m_RootDirectory);
std::vector<IoHash> ChunksToDelete;
std::atomic<uint64_t> ChunksToDeleteBytes{0};
@@ -694,7 +715,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
Stopwatch TotalTimer;
const auto _ = MakeGuard([&] {
ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
- m_Config.RootDirectory,
+ m_RootDirectory,
NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
DeletedCount,
ChunkCount,
@@ -706,7 +727,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
bool KeepThis = false;
CandidateCas.clear();
CandidateCas.push_back(Hash);
- GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) {
+ GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
ZEN_UNUSED(Hash);
KeepThis = true;
});
@@ -725,12 +746,12 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
if (ChunksToDelete.empty())
{
- ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory);
+ ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
return;
}
ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
- m_Config.RootDirectory,
+ m_RootDirectory,
ChunksToDelete.size(),
ChunkCount.load(),
NiceBytes(ChunksToDeleteBytes));
@@ -751,13 +772,13 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
if (Ec)
{
- ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message());
+ ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message());
continue;
}
DeletedCount++;
}
- GcCtx.DeletedCas(ChunksToDelete);
+ GcCtx.AddDeletedCids(ChunksToDelete);
}
//////////////////////////////////////////////////////////////////////////
@@ -769,13 +790,10 @@ TEST_CASE("cas.file.move")
// specifying an absolute path here can be helpful when using procmon to dig into things
ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
- CasGc Gc;
+ GcManager Gc;
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- FileCasStrategy FileCas(CasConfig, Gc);
- FileCas.Initialize(/* IsNewStore */ true);
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
{
std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
@@ -850,12 +868,9 @@ TEST_CASE("cas.file.gc")
// specifying an absolute path here can be helpful when using procmon to dig into things
ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
- CasStoreConfiguration CasConfig;
- CasConfig.RootDirectory = TempDir.Path() / "cas";
-
- CasGc Gc;
- FileCasStrategy FileCas(CasConfig, Gc);
- FileCas.Initialize(/* IsNewStore */ true);
+ GcManager Gc;
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
const int kIterationCount = 1000;
std::vector<IoHash> Keys{kIterationCount};
@@ -903,7 +918,7 @@ TEST_CASE("cas.file.gc")
{
if (Key.Hash[0] & 1)
{
- Ctx.ContributeCas(std::vector<IoHash>{Key});
+ Ctx.AddRetainedCids(std::vector<IoHash>{Key});
}
}
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index ef67ae9eb..f14e5d057 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -8,10 +8,11 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/thread.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
+#include "cas.h"
+
#include <atomic>
#include <functional>
@@ -28,28 +29,28 @@ class BasicFile;
struct FileCasStrategy final : public GcStorage
{
- FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
+ FileCasStrategy(GcManager& Gc);
~FileCasStrategy();
- void Initialize(bool IsNewStore);
+ void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
- void FilterChunks(CasChunkSet& InOutChunks);
+ void FilterChunks(HashKeySet& InOutChunks);
void Flush();
void Scrub(ScrubContext& Ctx);
virtual void CollectGarbage(GcContext& GcCtx) override;
virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; }
private:
- const CasStoreConfiguration& m_Config;
- RwLock m_Lock;
- RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
- spdlog::logger& m_Log;
- spdlog::logger& Log() { return m_Log; }
- std::atomic_uint64_t m_TotalSize{};
- bool m_IsInitialized = false;
+ std::filesystem::path m_RootDirectory;
+ RwLock m_Lock;
+ RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
+ std::atomic_uint64_t m_TotalSize{};
+ bool m_IsInitialized = false;
struct FileCasIndexEntry
{
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index bb03b9751..0902abf4a 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -14,9 +14,10 @@
#include <zencore/testing.h>
#include <zencore/testutils.h>
#include <zencore/timer.h>
-#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
+#include "cas.h"
+
#include <fmt/format.h>
#include <filesystem>
@@ -173,9 +174,8 @@ struct GcContext::GcState
using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>;
CacheKeyContexts m_ExpiredCacheKeys;
- CasChunkSet m_CasChunks;
- CasChunkSet m_DeletedCasChunks;
- CasChunkSet m_CidChunks;
+ HashKeySet m_RetainedCids;
+ HashKeySet m_DeletedCids;
GcClock::TimePoint m_GcTime;
GcClock::Duration m_MaxCacheDuration = std::chrono::hours(24);
bool m_DeletionMode = true;
@@ -194,19 +194,13 @@ GcContext::~GcContext()
}
void
-GcContext::ContributeCids(std::span<const IoHash> Cids)
-{
- m_State->m_CidChunks.AddChunksToSet(Cids);
-}
-
-void
-GcContext::ContributeCas(std::span<const IoHash> Cas)
+GcContext::AddRetainedCids(std::span<const IoHash> Cids)
{
- m_State->m_CasChunks.AddChunksToSet(Cas);
+ m_State->m_RetainedCids.AddHashesToSet(Cids);
}
void
-GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys)
+GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys)
{
m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys);
}
@@ -214,37 +208,31 @@ GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<I
void
GcContext::IterateCids(std::function<void(const IoHash&)> Callback)
{
- m_State->m_CidChunks.IterateChunks([&](const IoHash& Hash) { Callback(Hash); });
+ m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); });
}
void
GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc)
{
- m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
+ m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
}
void
-GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc)
+GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc)
{
- m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); });
+ m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc));
}
void
-GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc)
+GcContext::AddDeletedCids(std::span<const IoHash> Cas)
{
- m_State->m_CasChunks.FilterChunks(Cas, std::move(FilterFunc));
+ m_State->m_DeletedCids.AddHashesToSet(Cas);
}
-void
-GcContext::DeletedCas(std::span<const IoHash> Cas)
+const HashKeySet&
+GcContext::DeletedCids()
{
- m_State->m_DeletedCasChunks.AddChunksToSet(Cas);
-}
-
-CasChunkSet&
-GcContext::DeletedCas()
-{
- return m_State->m_DeletedCasChunks;
+ return m_State->m_DeletedCids;
}
std::span<const IoHash>
@@ -318,7 +306,7 @@ GcContext::ClaimGCReserve()
//////////////////////////////////////////////////////////////////////////
-GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc)
+GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc)
{
m_Gc.AddGcContributor(this);
}
@@ -330,7 +318,7 @@ GcContributor::~GcContributor()
//////////////////////////////////////////////////////////////////////////
-GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc)
+GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc)
{
m_Gc.AddGcStorage(this);
}
@@ -342,30 +330,30 @@ GcStorage::~GcStorage()
//////////////////////////////////////////////////////////////////////////
-CasGc::CasGc()
+GcManager::GcManager()
{
}
-CasGc::~CasGc()
+GcManager::~GcManager()
{
}
void
-CasGc::AddGcContributor(GcContributor* Contributor)
+GcManager::AddGcContributor(GcContributor* Contributor)
{
RwLock::ExclusiveLockScope _(m_Lock);
m_GcContribs.push_back(Contributor);
}
void
-CasGc::RemoveGcContributor(GcContributor* Contributor)
+GcManager::RemoveGcContributor(GcContributor* Contributor)
{
RwLock::ExclusiveLockScope _(m_Lock);
std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; });
}
void
-CasGc::AddGcStorage(GcStorage* Storage)
+GcManager::AddGcStorage(GcStorage* Storage)
{
ZEN_ASSERT(Storage != nullptr);
RwLock::ExclusiveLockScope _(m_Lock);
@@ -373,14 +361,14 @@ CasGc::AddGcStorage(GcStorage* Storage)
}
void
-CasGc::RemoveGcStorage(GcStorage* Storage)
+GcManager::RemoveGcStorage(GcStorage* Storage)
{
RwLock::ExclusiveLockScope _(m_Lock);
std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; });
}
void
-CasGc::CollectGarbage(GcContext& GcCtx)
+GcManager::CollectGarbage(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_Lock);
@@ -394,36 +382,6 @@ CasGc::CollectGarbage(GcContext& GcCtx)
}
}
- // Cache records reference CAS chunks with the uncompressed
- // raw hash (Cid). Map the content ID to CAS hash to enable
- // the CAS storage backends to filter valid chunks.
-
- if (CidStore* CidStore = m_CidStore)
- {
- std::vector<IoHash> CasHashes;
- uint64_t UnknownChunks = 0;
-
- GcCtx.IterateCids([&](const IoHash& Cid) {
- IoHash Cas = CidStore->RemapCid(Cid);
-
- if (Cas == IoHash::Zero)
- {
- ++UnknownChunks;
- }
- else
- {
- CasHashes.push_back(Cas);
- }
- });
-
- if (UnknownChunks)
- {
- ZEN_WARN("found {} unknown CIDs", UnknownChunks);
- }
-
- GcCtx.ContributeCas(CasHashes);
- }
-
// Then trim storage
{
@@ -434,61 +392,48 @@ CasGc::CollectGarbage(GcContext& GcCtx)
Storage->CollectGarbage(GcCtx);
}
}
+}
+
+GcStorageSize
+GcManager::TotalStorageSize() const
+{
+ RwLock::SharedLockScope _(m_Lock);
- // Remove Cid to CAS hash mappings. Scrub?
+ GcStorageSize TotalSize;
- if (CidStore* CidStore = m_CidStore)
+ for (GcStorage* Storage : m_GcStorage)
{
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- CidStore->RemoveCids(GcCtx.DeletedCas());
+ const auto Size = Storage->StorageSize();
+ TotalSize.DiskSize += Size.DiskSize;
+ TotalSize.MemorySize += Size.MemorySize;
}
-}
-void
-CasGc::SetCidStore(CidStore* Cids)
-{
- m_CidStore = Cids;
+ return TotalSize;
}
+#if ZEN_USE_REF_TRACKING
void
-CasGc::OnNewCidReferences(std::span<IoHash> Hashes)
+GcManager::OnNewCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
void
-CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes)
+GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
void
-CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
+GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
-
-GcStorageSize
-CasGc::TotalStorageSize() const
-{
- RwLock::SharedLockScope _(m_Lock);
-
- GcStorageSize TotalSize;
-
- for (GcStorage* Storage : m_GcStorage)
- {
- const auto Size = Storage->StorageSize();
- TotalSize.DiskSize += Size.DiskSize;
- TotalSize.MemorySize += Size.MemorySize;
- }
-
- return TotalSize;
-}
+#endif
//////////////////////////////////////////////////////////////////////////
-GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc)
+GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager)
{
}
@@ -606,7 +551,7 @@ GcScheduler::SchedulerThread()
{
std::error_code Ec;
DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec);
- GcStorageSize TotalSize = m_CasGc.TotalStorageSize();
+ GcStorageSize TotalSize = m_GcManager.TotalStorageSize();
std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now());
if (RemaingTime < std::chrono::seconds::zero())
@@ -668,7 +613,7 @@ GcScheduler::SchedulerThread()
Stopwatch Timer;
const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- m_CasGc.CollectGarbage(GcCtx);
+ m_GcManager.CollectGarbage(GcCtx);
m_LastGcTime = GcClock::Now();
m_NextGcTime = NextGcTime(m_LastGcTime);
@@ -745,38 +690,37 @@ TEST_CASE("gc.basic")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
+ CidStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path() / "cas";
- CasGc Gc;
- std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc);
- CidStore CidStore{*CasStore, TempDir.Path() / "cid"};
+ GcManager Gc;
+ CidStore CidStore(Gc);
- CasStore->Initialize(CasConfig);
- Gc.SetCidStore(&CidStore);
+ CidStore.Initialize(CasConfig);
IoBuffer Chunk = CreateChunk(128);
auto CompressedChunk = Compress(Chunk);
const auto InsertResult = CidStore.AddChunk(CompressedChunk);
+ CHECK(InsertResult.New);
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- CasStore->Flush();
+ CidStore.Flush();
Gc.CollectGarbage(GcCtx);
- CHECK(!CidStore.ContainsChunk(InsertResult.DecompressedId));
+ CHECK(!CidStore.ContainsChunk(IoHash::FromBLAKE3(CompressedChunk.GetRawHash())));
}
TEST_CASE("gc.full")
{
ScopedTemporaryDirectory TempDir;
- CasStoreConfiguration CasConfig;
+ CidStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path() / "cas";
- CasGc Gc;
+ GcManager Gc;
std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc);
CasStore->Initialize(CasConfig);
@@ -813,7 +757,7 @@ TEST_CASE("gc.full")
CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
- CasStoreSize InitialSize = CasStore->TotalSize();
+ CidStoreSize InitialSize = CasStore->TotalSize();
// Keep first and last
{
@@ -823,7 +767,7 @@ TEST_CASE("gc.full")
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[0]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
@@ -856,7 +800,7 @@ TEST_CASE("gc.full")
GcCtx.CollectSmallObjects(true);
std::vector<IoHash> KeepChunks;
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
@@ -890,7 +834,7 @@ TEST_CASE("gc.full")
KeepChunks.push_back(ChunkHashes[1]);
KeepChunks.push_back(ChunkHashes[4]);
KeepChunks.push_back(ChunkHashes[7]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
@@ -925,7 +869,7 @@ TEST_CASE("gc.full")
KeepChunks.push_back(ChunkHashes[6]);
KeepChunks.push_back(ChunkHashes[7]);
KeepChunks.push_back(ChunkHashes[8]);
- GcCtx.ContributeCas(KeepChunks);
+ GcCtx.AddRetainedCids(KeepChunks);
CasStore->Flush();
Gc.CollectGarbage(GcCtx);
diff --git a/zenstore/hashkeyset.cpp b/zenstore/hashkeyset.cpp
new file mode 100644
index 000000000..a5436f5cb
--- /dev/null
+++ b/zenstore/hashkeyset.cpp
@@ -0,0 +1,60 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenstore/hashkeyset.h>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+void
+HashKeySet::AddHashToSet(const IoHash& HashToAdd)
+{
+ m_HashSet.insert(HashToAdd);
+}
+
+void
+HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd)
+{
+ m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end());
+}
+
+void
+HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
+{
+ for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;)
+ {
+ if (Predicate(*It))
+ {
+ It = m_HashSet.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const
+{
+ for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It)
+ {
+ Callback(*It);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Testing related code follows...
+//
+
+#if ZEN_WITH_TESTS
+
+void
+hashkeyset_forcelink()
+{
+}
+
+#endif
+
+} // namespace zen
diff --git a/zenstore/include/zenstore/cas.h b/zenstore/include/zenstore/cas.h
deleted file mode 100644
index 5592fbd0a..000000000
--- a/zenstore/include/zenstore/cas.h
+++ /dev/null
@@ -1,144 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "zenstore.h"
-
-#include <zencore/blake3.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/refcount.h>
-#include <zencore/timer.h>
-
-#include <atomic>
-#include <filesystem>
-#include <functional>
-#include <memory>
-#include <string>
-#include <unordered_set>
-
-namespace zen {
-
-class GcContext;
-class CasGc;
-
-struct CasStoreConfiguration
-{
- // Root directory for CAS store
- std::filesystem::path RootDirectory;
-
- // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy
- uint64_t TinyValueThreshold = 1024;
-
- // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy
- uint64_t HugeValueThreshold = 1024 * 1024;
-};
-
-/** Manage a set of IoHash values
- */
-
-class CasChunkSet
-{
-public:
- void AddChunkToSet(const IoHash& HashToAdd);
- void AddChunksToSet(std::span<const IoHash> HashesToAdd);
- void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
- void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
- [[nodiscard]] inline bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
- [[nodiscard]] inline bool IsEmpty() const { return m_ChunkSet.empty(); }
- [[nodiscard]] inline size_t GetSize() const { return m_ChunkSet.size(); }
-
- inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc)
- {
- for (const IoHash& Candidate : Candidates)
- {
- if (ContainsChunk(Candidate))
- {
- MatchFunc(Candidate);
- }
- }
- }
-
- inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc)
- {
- for (const IoHash& Candidate : Candidates)
- {
- MatchFunc(Candidate, ContainsChunk(Candidate));
- }
- }
-
-private:
- // Q: should we protect this with a lock, or is that a higher level concern?
- std::unordered_set<IoHash, IoHash::Hasher> m_ChunkSet;
-};
-
-/** Context object for data scrubbing
- *
- * Data scrubbing is when we traverse stored data to validate it and
- * optionally correct/recover
- */
-
-class ScrubContext
-{
-public:
- virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks);
- inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
- inline bool RunRecovery() const { return m_Recover; }
- void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes);
-
- inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
- inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
-
-private:
- uint64_t m_ScrubTime = GetHifreqTimerValue();
- bool m_Recover = true;
- std::atomic<uint64_t> m_ChunkCount{0};
- std::atomic<uint64_t> m_ByteCount{0};
- CasChunkSet m_BadCas;
- CasChunkSet m_BadCid;
-};
-
-struct CasStoreSize
-{
- uint64_t TinySize{};
- uint64_t SmallSize{};
- uint64_t LargeSize{};
- uint64_t TotalSize{};
-};
-
-/** Content Addressable Storage interface
-
- */
-
-class CasStore
-{
-public:
- virtual ~CasStore() = default;
-
- const CasStoreConfiguration& Config() { return m_Config; }
-
- struct InsertResult
- {
- bool New = false;
- };
-
- virtual void Initialize(const CasStoreConfiguration& Config) = 0;
- virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
- virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
- virtual void FilterChunks(CasChunkSet& InOutChunks) = 0;
- virtual void Flush() = 0;
- virtual void Scrub(ScrubContext& Ctx) = 0;
- virtual void GarbageCollect(GcContext& GcCtx) = 0;
- virtual CasStoreSize TotalSize() const = 0;
-
-protected:
- CasStoreConfiguration m_Config;
- uint64_t m_LastScrubTime = 0;
-};
-
-ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc);
-
-void CAS_forcelink();
-
-} // namespace zen
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index 4b93a708f..c56b653fc 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -2,8 +2,6 @@
#pragma once
-#include "zenstore.h"
-
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index b0252a2a6..21e3c3160 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -5,7 +5,7 @@
#include "zenstore.h"
#include <zencore/iohash.h>
-#include <zenstore/cas.h>
+#include <zenstore/hashkeyset.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
@@ -15,53 +15,68 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+class GcManager;
class CasStore;
class CompressedBuffer;
class IoBuffer;
+class ScrubContext;
/** Content Store
*
- * Data in the content store is referenced by content identifiers (CIDs), rather than their
- * literal hash. This class maps uncompressed hashes to compressed hashes and may
+ * Data in the content store is referenced by content identifiers (CIDs), it works
+ * with compressed buffers so the CID is expected to be the RAW hash. It stores the
+ * chunk directly under the RAW hash.
+ * This class maps uncompressed hashes (CIDs) to compressed hashes and may
* be used to deal with other kinds of indirections in the future. For example, if we want
* to support chunking then a CID may represent a list of chunks which could be concatenated
* to form the referenced chunk.
*
- * It would likely be possible to implement this mapping in a more efficient way if we
- * integrate it into the CAS store itself, so we can avoid maintaining copies of large
- * hashes in multiple locations. This would also allow us to consolidate commit logs etc
- * which would be more resilient than the current split log scheme
- *
*/
+
+struct CidStoreSize
+{
+ uint64_t TinySize = 0;
+ uint64_t SmallSize = 0;
+ uint64_t LargeSize = 0;
+ uint64_t TotalSize = 0;
+};
+
+struct CidStoreConfiguration
+{
+ // Root directory for CAS store
+ std::filesystem::path RootDirectory;
+
+ // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy
+ uint64_t TinyValueThreshold = 1024;
+
+ // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy
+ uint64_t HugeValueThreshold = 1024 * 1024;
+};
+
class CidStore
{
public:
- CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir);
+ CidStore(GcManager& Gc);
~CidStore();
struct InsertResult
{
- IoHash DecompressedId;
- IoHash CompressedHash;
- bool New = false;
+ bool New = false;
};
- InsertResult AddChunk(CompressedBuffer& ChunkData);
- void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed);
+ void Initialize(const CidStoreConfiguration& Config);
+ InsertResult AddChunk(const CompressedBuffer& ChunkData);
IoBuffer FindChunkByCid(const IoHash& DecompressedId);
bool ContainsChunk(const IoHash& DecompressedId);
+ void FilterChunks(HashKeySet& InOutChunks);
void Flush();
void Scrub(ScrubContext& Ctx);
- void RemoveCids(CasChunkSet& CasChunks);
- CasStoreSize CasSize() const;
-
- // TODO: add batch filter support
-
- IoHash RemapCid(const IoHash& DecompressedId);
+ CidStoreSize TotalSize() const;
private:
struct Impl;
- std::unique_ptr<Impl> m_Impl;
+ std::unique_ptr<CasStore> m_CasStore;
+ std::unique_ptr<Impl> m_Impl;
};
} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 398025181..656e594af 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -22,8 +22,8 @@ class logger;
namespace zen {
-class CasChunkSet;
-class CasGc;
+class HashKeySet;
+class GcManager;
class CidStore;
struct IoHash;
@@ -50,18 +50,16 @@ public:
GcContext(GcClock::TimePoint Time = GcClock::Now());
~GcContext();
- void ContributeCids(std::span<const IoHash> Cid);
- void ContributeCas(std::span<const IoHash> Hash);
- void ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys);
+ void AddRetainedCids(std::span<const IoHash> Cid);
+ void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys);
void IterateCids(std::function<void(const IoHash&)> Callback);
void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
- void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc);
- void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc);
+ void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc);
- void DeletedCas(std::span<const IoHash> Cas);
- CasChunkSet& DeletedCas();
+ void AddDeletedCids(std::span<const IoHash> Cas);
+ const HashKeySet& DeletedCids();
std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const;
@@ -97,13 +95,13 @@ private:
class GcContributor
{
public:
- GcContributor(CasGc& Gc);
+ GcContributor(GcManager& Gc);
~GcContributor();
virtual void GatherReferences(GcContext& GcCtx) = 0;
protected:
- CasGc& m_Gc;
+ GcManager& m_Gc;
};
struct GcStorageSize
@@ -117,23 +115,23 @@ struct GcStorageSize
class GcStorage
{
public:
- GcStorage(CasGc& Gc);
+ GcStorage(GcManager& Gc);
~GcStorage();
virtual void CollectGarbage(GcContext& GcCtx) = 0;
virtual GcStorageSize StorageSize() const = 0;
private:
- CasGc& m_Gc;
+ GcManager& m_Gc;
};
/** GC orchestrator
*/
-class CasGc
+class GcManager
{
public:
- CasGc();
- ~CasGc();
+ GcManager();
+ ~GcManager();
void AddGcContributor(GcContributor* Contributor);
void RemoveGcContributor(GcContributor* Contributor);
@@ -143,12 +141,14 @@ public:
void CollectGarbage(GcContext& GcCtx);
- void SetCidStore(CidStore* Cids);
- void OnNewCidReferences(std::span<IoHash> Hashes);
- void OnCommittedCidReferences(std::span<IoHash> Hashes);
- void OnDroppedCidReferences(std::span<IoHash> Hashes);
GcStorageSize TotalStorageSize() const;
+#if ZEN_USE_REF_TRACKING
+ void OnNewCidReferences(std::span<IoHash> Hashes);
+ void OnCommittedCidReferences(std::span<IoHash> Hashes);
+ void OnDroppedCidReferences(std::span<IoHash> Hashes);
+#endif
+
private:
mutable RwLock m_Lock;
std::vector<GcContributor*> m_GcContribs;
@@ -180,7 +180,7 @@ struct GcSchedulerConfig
class GcScheduler
{
public:
- GcScheduler(CasGc& CasGc);
+ GcScheduler(GcManager& GcManager);
~GcScheduler();
void Initialize(const GcSchedulerConfig& Config);
@@ -201,7 +201,7 @@ private:
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- CasGc& m_CasGc;
+ GcManager& m_GcManager;
GcSchedulerConfig m_Config;
GcClock::TimePoint m_LastGcTime{};
GcClock::TimePoint m_NextGcTime{};
diff --git a/zenstore/include/zenstore/hashkeyset.h b/zenstore/include/zenstore/hashkeyset.h
new file mode 100644
index 000000000..411a6256e
--- /dev/null
+++ b/zenstore/include/zenstore/hashkeyset.h
@@ -0,0 +1,54 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "zenstore.h"
+
+#include <zencore/iohash.h>
+
+#include <functional>
+#include <unordered_set>
+
+namespace zen {
+
+/** Manage a set of IoHash values
+ */
+
+class HashKeySet
+{
+public:
+ void AddHashToSet(const IoHash& HashToAdd);
+ void AddHashesToSet(std::span<const IoHash> HashesToAdd);
+ void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
+ void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const;
+ [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); }
+ [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); }
+ [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); }
+
+ inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ if (ContainsHash(Candidate))
+ {
+ MatchFunc(Candidate);
+ }
+ }
+ }
+
+ inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ MatchFunc(Candidate, ContainsHash(Candidate));
+ }
+ }
+
+private:
+ // Q: should we protect this with a lock, or is that a higher level concern?
+ std::unordered_set<IoHash, IoHash::Hasher> m_HashSet;
+};
+
+void hashkeyset_forcelink();
+
+} // namespace zen
diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h
new file mode 100644
index 000000000..bf906492c
--- /dev/null
+++ b/zenstore/include/zenstore/scrubcontext.h
@@ -0,0 +1,40 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/timer.h>
+
+namespace zen {
+
+/** Context object for data scrubbing
+ *
+ * Data scrubbing is when we traverse stored data to validate it and
+ * optionally correct/recover
+ */
+
+class ScrubContext
+{
+public:
+ virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); }
+ inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
+ inline bool RunRecovery() const { return m_Recover; }
+ void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
+ {
+ m_ChunkCount.fetch_add(ChunkCount);
+ m_ByteCount.fetch_add(ChunkBytes);
+ }
+
+ inline uint64_t ScrubbedChunks() const { return m_ChunkCount; }
+ inline uint64_t ScrubbedBytes() const { return m_ByteCount; }
+
+ const HashKeySet BadCids() const { return m_BadCid; }
+
+private:
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
+ std::atomic<uint64_t> m_ChunkCount{0};
+ std::atomic<uint64_t> m_ByteCount{0};
+ HashKeySet m_BadCid;
+};
+
+} // namespace zen
diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp
index 5f40b7f60..836cdf691 100644
--- a/zenstore/zenstore.cpp
+++ b/zenstore/zenstore.cpp
@@ -4,8 +4,10 @@
#include <zenstore/basicfile.h>
#include <zenstore/blockstore.h>
-#include <zenstore/cas.h>
#include <zenstore/gc.h>
+#include <zenstore/hashkeyset.h>
+
+#include "cas.h"
#include "compactcas.h"
#include "filecas.h"
@@ -20,6 +22,7 @@ zenstore_forcelinktests()
blockstore_forcelink();
compactcas_forcelink();
gc_forcelink();
+ hashkeyset_forcelink();
}
} // namespace zen