diff options
| author | Dan Engelbrecht <[email protected]> | 2022-06-17 07:06:21 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-06-17 07:06:21 -0700 |
| commit | c7e22a4ef1cce7103b9afbeec487461cb32f8dbe (patch) | |
| tree | 8b99d51bf496c96f82161c18fbdcfd5c6f8f31fd | |
| parent | fixed merge mistake which caused a build error (diff) | |
| download | zen-0.1.4-pre6.tar.xz zen-0.1.4-pre6.zip | |
Make cas storage an hidden implementation detail of CidStore (#130)v0.1.4-pre6v0.1.4-pre5
- Bumped ZEN_SCHEMA_VERSION
- CasStore no longer a public API, it is hidden behind CidStore
- Moved cas.h from public header folder
- CidStore no longer maps from Cid -> Cas, we store entries in Cas under RawHash
- CasStore now decompresses data to validate content (matching against RawHash)
- CasChunkSet renames to HashKeySet and put in separate header/cpp file
- Disabled "Chunk" command for now as it relied on CAS being exposed as a service
- Changed CAS http service to Cid http server
- Moved "Run" command completely inside ZEN_WITH_EXEC_SERVICES define
- Removed "cas.basic" test
- Uncommented ".exec.basic" test and added return-skip at start of test
- Moved ScrubContext to separate header file
- Renamed CasGC to GcManager
- Cleaned up configuration passing in cas store classes
- Removed CAS stuff from GcContext and clarified naming in class
- Remove migration code
40 files changed, 1033 insertions, 2147 deletions
@@ -1,6 +1,6 @@ -- Copyright Epic Games, Inc. All Rights Reserved. -set_configvar("ZEN_SCHEMA_VERSION", 3) -- changed cas oplog format (p3rl) +set_configvar("ZEN_SCHEMA_VERSION", 4) -- store Cid data in CAS under raw hash (dan.engelbrecht) add_requires( "vcpkg::asio", diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp index 42ea8eddc..3fd7c4c1f 100644 --- a/zen/chunk/chunk.cpp +++ b/zen/chunk/chunk.cpp @@ -2,39 +2,39 @@ #include "chunk.h" -#include <gsl/gsl-lite.hpp> - -#include <zencore/filesystem.h> -#include <zencore/iohash.h> -#include <zencore/logging.h> -#include <zencore/refcount.h> -#include <zencore/scopeguard.h> -#include <zencore/sha1.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/thread.h> -#include <zencore/timer.h> -#include <zenstore/cas.h> -#include <zenstore/gc.h> - -#include "../internalfile.h" - -#include <lz4.h> -#include <zstd.h> - -#if ZEN_PLATFORM_WINDOWS -# include <ppl.h> -# include <ppltasks.h> -#endif // ZEN_PLATFORM_WINDOWS - -#include <cmath> -#include <filesystem> -#include <random> -#include <vector> +#if 0 +# include <gsl/gsl-lite.hpp> + +# include <zencore/filesystem.h> +# include <zencore/iohash.h> +# include <zencore/logging.h> +# include <zencore/refcount.h> +# include <zencore/scopeguard.h> +# include <zencore/sha1.h> +# include <zencore/string.h> +# include <zencore/testing.h> +# include <zencore/thread.h> +# include <zencore/timer.h> +# include <zenstore/gc.h> + +# include "../internalfile.h" + +# include <lz4.h> +# include <zstd.h> + +# if ZEN_PLATFORM_WINDOWS +# include <ppl.h> +# include <ppltasks.h> +# endif // ZEN_PLATFORM_WINDOWS + +# include <cmath> +# include <filesystem> +# include <random> +# include <vector> ////////////////////////////////////////////////////////////////////////// -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC namespace Concurrency { @@ -75,7 +75,7 @@ struct task_group } // namespace Concurrency -#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC ////////////////////////////////////////////////////////////////////////// @@ -595,9 +595,9 @@ protected: { zen::RwLock HashLock; std::unordered_set<zen::IoHash, zen::IoHash::Hasher> Hashes; -#if ZEN_PLATFORM_WINDOWS -# pragma warning(suppress : 4324) // Padding due to alignment -#endif +# if ZEN_PLATFORM_WINDOWS +# pragma warning(suppress : 4324) // Padding due to alignment +# endif }; Bucket m_Buckets[256]; @@ -902,7 +902,7 @@ public: m_BufferManager.ReturnBuffer(Buffer); -#if 0 +# if 0 Active.AddCount(); // needs fixing Concurrency::create_task([this, Zfile, CurrentPosition, DataPointer, &Active] { @@ -934,7 +934,7 @@ public: _aligned_free(CompressBuffer); }); -#endif +# endif } StatsBlock& Stats = m_StatsBlock.local(); @@ -990,7 +990,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::unique_ptr<zen::CasStore> CasStore; - zen::CasGc Gc; + zen::GcManager Gc; if (!m_RootDirectory.empty()) { @@ -1056,14 +1056,14 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) zen::Stopwatch timer; -#if 1 +# if 1 Concurrency::parallel_for_each(begin(Files), end(Files), [&Chunker](const auto& ThisFile) { Chunker.ChunkFile(ThisFile); }); -#else +# else for (const auto& ThisFile : Files) { Chunker.ChunkFile(ThisFile); } -#endif +# endif uint64_t ElapsedMs = timer.GetElapsedTimeMs(); @@ -1106,7 +1106,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ////////////////////////////////////////////////////////////////////////// -#if ZEN_WITH_TESTS +# if ZEN_WITH_TESTS TEST_CASE("chunking") { using namespace zen; @@ -1209,4 +1209,5 @@ TEST_CASE("chunking") SUBCASE("mod method") { test(/* UseThreshold */ false, /* Random */ Random, 2048, 1 * 1024 * 1024); } } +# endif #endif diff --git a/zen/chunk/chunk.h b/zen/chunk/chunk.h index f93f7e4f2..32d87b1b7 100644 --- a/zen/chunk/chunk.h +++ b/zen/chunk/chunk.h @@ -4,6 +4,7 @@ #include <zencore/zencore.h> #include "../zen.h" +#if 0 class ChunkCommand : public ZenCmdBase { public: @@ -21,3 +22,4 @@ private: size_t m_AverageChunkSize = 0; bool m_UseCompression = true; }; +#endif // 0 diff --git a/zen/cmds/run.cpp b/zen/cmds/run.cpp index 0fde4648a..86e67b391 100644 --- a/zen/cmds/run.cpp +++ b/zen/cmds/run.cpp @@ -4,28 +4,30 @@ #include "run.h" -#include <zencore/compactbinarybuilder.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/iohash.h> -#include <zencore/logging.h> -#include <zencore/stream.h> -#include <zencore/string.h> -#include <zencore/timer.h> -#include <zenutil/zenserverprocess.h> - -#include <filesystem> +#if ZEN_WITH_EXEC_SERVICES + +# include <zencore/compactbinarybuilder.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iohash.h> +# include <zencore/logging.h> +# include <zencore/stream.h> +# include <zencore/string.h> +# include <zencore/timer.h> +# include <zenutil/zenserverprocess.h> + +# include <filesystem> ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> +# include <cpr/cpr.h> ZEN_THIRD_PARTY_INCLUDES_END -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "Crypt32.lib") -# pragma comment(lib, "Wldap32.lib") -# pragma comment(lib, "Ws2_32.lib") -#endif +# if ZEN_PLATFORM_WINDOWS +# pragma comment(lib, "Crypt32.lib") +# pragma comment(lib, "Wldap32.lib") +# pragma comment(lib, "Ws2_32.lib") +# endif ////////////////////////////////////////////////////////////////////////// @@ -156,7 +158,7 @@ RunCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) zen::IoBuffer FileData = zen::IoBufferBuilder::MakeFromFile(It->second); cpr::Response CasResponse = - cpr::Post(cpr::Url("http://localhost:13337/cas"), cpr::Body((const char*)FileData.Data(), FileData.Size())); + cpr::Post(cpr::Url("http://localhost:13337/cid"), cpr::Body((const char*)FileData.Data(), FileData.Size())); if (CasResponse.status_code >= 300) { @@ -179,3 +181,5 @@ RunCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } // namespace zen + +#endif // ZEN_WITH_EXEC_SERVICES diff --git a/zen/cmds/run.h b/zen/cmds/run.h index 3e1e3f2b2..c5595f235 100644 --- a/zen/cmds/run.h +++ b/zen/cmds/run.h @@ -4,6 +4,8 @@ #include "../zen.h" +#if ZEN_WITH_EXEC_SERVICES + namespace zen { /** Execute a command (using Zen) @@ -24,3 +26,5 @@ private: }; } // namespace zen + +#endif // ZEN_WITH_EXEC_SERVICES diff --git a/zen/zen.cpp b/zen/zen.cpp index 302e8496f..5abe4b04e 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -20,7 +20,6 @@ #include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/zencore.h> -#include <zenstore/cas.h> #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER @@ -109,12 +108,13 @@ main(int argc, char** argv) auto _ = zen::MakeGuard([] { spdlog::shutdown(); }); - HashCommand HashCmd; - CopyCommand CopyCmd; - DedupCommand DedupCmd; - DropCommand DropCmd; - ChunkCommand ChunkCmd; - RunCommand RunCmd; + HashCommand HashCmd; + CopyCommand CopyCmd; + DedupCommand DedupCmd; + DropCommand DropCmd; +#if ZEN_WITH_EXEC_SERVICES + RunCommand RunCmd; +#endif StatusCommand StatusCmd; TopCommand TopCmd; PrintCommand PrintCmd; @@ -134,14 +134,16 @@ main(int argc, char** argv) const char* CmdSummary; } Commands[] = { // clang-format off - {"chunk", &ChunkCmd, "Perform chunking"}, +// {"chunk", &ChunkCmd, "Perform chunking"}, {"copy", &CopyCmd, "Copy file(s)"}, {"dedup", &DedupCmd, "Dedup files"}, {"drop", &DropCmd, "Drop cache bucket(s)"}, {"hash", &HashCmd, "Compute file hashes"}, {"print", &PrintCmd, "Print compact binary object"}, {"printpackage", &PrintPkgCmd, "Print compact binary package"}, +#if ZEN_WITH_EXEC_SERVICES {"run", &RunCmd, "Remote execution"}, +#endif // ZEN_WITH_EXEC_SERVICES {"status", &StatusCmd, "Show zen status"}, {"ps", &PsCmd, "Enumerate running zen server instances"}, {"top", &TopCmd, "Monitor zen server activity"}, diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 46b566a47..0ffb77f3e 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -438,106 +438,6 @@ TEST_CASE("multi.basic") ZEN_INFO("{} requests in {} ({})", RequestCount, zen::NiceTimeSpanMs(Elapsed), zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); } -TEST_CASE("cas.basic") -{ - std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); - - const uint16_t PortNumber = 13337; - - const int IterationCount = 1000; - - std::vector<int> ChunkSizes(IterationCount); - std::vector<zen::IoHash> ChunkHashes(IterationCount); - - { - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - Instance1.SpawnServer(PortNumber); - Instance1.WaitUntilReady(); - - std::atomic<uint64_t> RequestCount{0}; - - zen::Stopwatch timer; - - std::mt19937_64 mt; - - auto BaseUri = fmt::format("http://localhost:{}/cas", PortNumber); - - cpr::Session cli; - cli.SetUrl(cpr::Url{BaseUri}); - - // Populate CAS with some generated data - - for (int i = 0; i < IterationCount; ++i) - { - const int ChunkSize = mt() % 10000 + 5; - std::string body = fmt::format("{}", i); - body.resize(ChunkSize, ' '); - - ChunkSizes[i] = ChunkSize; - ChunkHashes[i] = zen::IoHash::HashBuffer(body.data(), body.size()); - - cli.SetBody(body); - - auto res = cli.Post(); - CHECK(!res.error); - - ++RequestCount; - } - - // Verify that the chunks persisted - - for (int i = 0; i < IterationCount; ++i) - { - zen::ExtendableStringBuilder<128> Uri; - Uri << BaseUri << "/"; - ChunkHashes[i].ToHexString(Uri); - - auto res = cpr::Get(cpr::Url{Uri.c_str()}); - CHECK(!res.error); - CHECK(res.status_code == 200); - CHECK(res.text.size() == ChunkSizes[i]); - - zen::IoHash Hash = zen::IoHash::HashBuffer(res.text.data(), res.text.size()); - - CHECK(ChunkHashes[i] == Hash); - - ++RequestCount; - } - - uint64_t Elapsed = timer.GetElapsedTimeMs(); - - ZEN_INFO("{} requests in {} ({})", - RequestCount, - zen::NiceTimeSpanMs(Elapsed), - zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req")); - } - - // Verify that the data persists between process runs (the previous server has exited at this point) - - { - ZenServerInstance Instance2(TestEnv); - Instance2.SetTestDir(TestDir); - Instance2.SpawnServer(PortNumber); - Instance2.WaitUntilReady(); - - for (int i = 0; i < IterationCount; ++i) - { - zen::ExtendableStringBuilder<128> Uri; - Uri << fmt::format("http://localhost:{}/cas/", PortNumber); - ChunkHashes[i].ToHexString(Uri); - - auto res = cpr::Get(cpr::Url{Uri.c_str()}); - CHECK(res.status_code == 200); - CHECK(res.text.size() == ChunkSizes[i]); - - zen::IoHash Hash = zen::IoHash::HashBuffer(res.text.data(), res.text.size()); - - CHECK(ChunkHashes[i] == Hash); - } - } -} - TEST_CASE("project.basic") { using namespace std::literals; @@ -2416,11 +2316,11 @@ struct RemoteExecutionRequest { zen::IoBuffer FileData = zen::IoBufferBuilder::MakeFromFile(It->second); - cpr::Response CasResponse = cpr::Post(cpr::Url(m_CasUri), cpr::Body((const char*)FileData.Data(), FileData.Size())); + cpr::Response CidResponse = cpr::Post(cpr::Url(m_CidUri), cpr::Body((const char*)FileData.Data(), FileData.Size())); - if (CasResponse.status_code >= 300) + if (CidResponse.status_code >= 300) { - ZEN_ERROR("CAS put failed with {}", CasResponse.status_code); + ZEN_ERROR("CID put failed with {}", CidResponse.status_code); } } else @@ -2496,45 +2396,47 @@ private: int m_PortNumber; std::filesystem::path m_TreePath; const std::string m_BaseUri = fmt::format("http://{}:{}/exec/jobs", m_HostName, m_PortNumber); - const std::string m_CasUri = fmt::format("http://{}:{}/cas", m_HostName, m_PortNumber); + const std::string m_CidUri = fmt::format("http://{}:{}/cid", m_HostName, m_PortNumber); Visitor m_Visit{m_TreePath}; zen::BinaryWriter m_MemOut; }; -// TEST_CASE(".exec.basic" /* * doctest::skip(true) */) -//{ -// using namespace std::literals; -// -// std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); -// -// const uint16_t PortNumber = 13337; -// -// ZenServerInstance Zen1(TestEnv); -// Zen1.SetTestDir(TestDir); -// Zen1.SpawnServer(PortNumber); -// Zen1.WaitUntilReady(); -// -// std::filesystem::path TreePath = TestEnv.GetTestRootDir("test/remote1"); -// -// { -// RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); -// RemoteRequest.Build("zentest-appstub.exe", ""); -// RemoteRequest.Prep(); -// zen::CbObject Result = RemoteRequest.Exec(); -// -// CHECK(Result["exitcode"sv].AsInt32(-1) == 0); -// } -// -// { -// RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); -// RemoteRequest.Build("zentest-appstub.exe", "-f=1"); -// RemoteRequest.Prep(); -// zen::CbObject Result = RemoteRequest.Exec(); -// -// CHECK(Result["exitcode"sv].AsInt32(-1) == 1); -// } -// } +TEST_CASE(".exec.basic") +{ + if (true) + { + return; + } + using namespace std::literals; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + + const uint16_t PortNumber = 13337; + + ZenServerInstance Zen1(TestEnv); + Zen1.SetTestDir(TestDir); + Zen1.SpawnServer(PortNumber); + Zen1.WaitUntilReady(); + + std::filesystem::path TreePath = TestEnv.GetTestRootDir("test/remote1"); + + { + RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); + RemoteRequest.Build("zentest-appstub.exe", ""); + RemoteRequest.Prep(); + zen::CbObject Result = RemoteRequest.Exec(); + CHECK(Result["exitcode"sv].AsInt32(-1) == 0); + } + + { + RemoteExecutionRequest RemoteRequest("localhost", PortNumber, TreePath); + RemoteRequest.Build("zentest-appstub.exe", "-f=1"); + RemoteRequest.Prep(); + zen::CbObject Result = RemoteRequest.Exec(); + CHECK(Result["exitcode"sv].AsInt32(-1) == 1); + } +} # endif // ZEN_WITH_EXEC_SERVICES TEST_CASE("mesh.basic") diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 45bbe062b..1f89e7362 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -16,7 +16,6 @@ #include <zencore/trace.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> -#include <zenstore/cas.h> #include <zenutil/cache/cache.h> //#include "cachekey.h" @@ -26,6 +25,7 @@ #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" +#include "zenstore/scrubcontext.h" #include <algorithm> #include <atomic> @@ -859,7 +859,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); - ValidAttachments.emplace_back(InsertResult.DecompressedId); + ValidAttachments.emplace_back(Hash); if (InsertResult.New) { @@ -1205,7 +1205,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); - ValidAttachments.emplace_back(InsertResult.DecompressedId); + ValidAttachments.emplace_back(ValueHash); if (InsertResult.New) { @@ -2382,7 +2382,7 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) const uint64_t MissCount = m_CacheStats.MissCount; const uint64_t TotalCount = HitCount + MissCount; - const CasStoreSize CasSize = m_CidStore.CasSize(); + const CidStoreSize CidSize = m_CidStore.TotalSize(); const GcStorageSize CacheSize = m_CacheStore.StorageSize(); Cbo.BeginObject("cache"); @@ -2400,12 +2400,12 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) m_UpstreamCache.GetStatus(Cbo); Cbo.EndObject(); - Cbo.BeginObject("cas"); + Cbo.BeginObject("cid"); Cbo.BeginObject("size"); - Cbo << "tiny" << CasSize.TinySize; - Cbo << "small" << CasSize.SmallSize; - Cbo << "large" << CasSize.LargeSize; - Cbo << "total" << CasSize.TotalSize; + Cbo << "tiny" << CidSize.TinySize; + Cbo << "small" << CidSize.SmallSize; + Cbo << "large" << CidSize.LargeSize; + Cbo << "total" << CidSize.TotalSize; Cbo.EndObject(); Cbo.EndObject(); diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 4be33170c..4e7ad522d 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -18,6 +18,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> #include <xxhash.h> @@ -66,67 +67,10 @@ namespace { static_assert(sizeof(CacheBucketIndexHeader) == 32); - struct LegacyDiskLocation - { - inline LegacyDiskLocation() = default; - - inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) - : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) - , LowerSize(ValueSize & 0xFFFFffff) - , IndexDataSize(IndexSize) - { - } - - static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; - static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize) - static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; - static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file - static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary - static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value - static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format - - static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } - - inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } - inline uint64_t Size() const { return LowerSize; } - inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } - inline ZenContentType GetContentType() const - { - ZenContentType ContentType = ZenContentType::kBinary; - - if (IsFlagSet(LegacyDiskLocation::kStructured)) - { - ContentType = ZenContentType::kCbObject; - } - - if (IsFlagSet(LegacyDiskLocation::kCompressed)) - { - ContentType = ZenContentType::kCompressedBinary; - } - - return ContentType; - } - inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; } - - private: - uint64_t OffsetAndFlags = 0; - uint32_t LowerSize = 0; - uint32_t IndexDataSize = 0; - }; - - struct LegacyDiskIndexEntry - { - IoHash Key; - LegacyDiskLocation Location; - }; - #pragma pack(pop) - static_assert(sizeof(LegacyDiskIndexEntry) == 36); - - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".slog"; - const char* LegacyDataExtension = ".sobs"; + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".slog"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { @@ -143,42 +87,6 @@ namespace { return BucketDir / (BucketName + LogExtension); } - std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir) - { - return BucketDir / (std::string("zen") + LogExtension); - } - - std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir) - { - return BucketDir / (std::string("zen") + LegacyDataExtension); - } - - bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured | - LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed)) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString()); - return false; - } - if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) - { - return true; - } - uint64_t Size = Entry.Location.Size(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -262,7 +170,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } -ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir) +ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc) , GcContributor(Gc) , m_RootDir(RootDir) @@ -583,9 +491,25 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) std::vector<IoHash> BadHashes; + auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) { + if (ContentType == ZenContentType::kCbObject) + { + CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); + return Error == CbValidateError::None; + } + if (ContentType == ZenContentType::kCompressedBinary) + { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed) + { + return false; + } + } + return true; + }; + for (auto& Kv : m_CacheMap) { - if (Kv.first != IoHash::HashBuffer(Kv.second.Payload)) + if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload)) { BadHashes.push_back(Kv.first); } @@ -593,7 +517,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) if (!BadHashes.empty()) { - Ctx.ReportBadCasChunks(BadHashes); + Ctx.ReportBadCidChunks(BadHashes); } } @@ -891,229 +815,6 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount) return 0; }; -uint64_t -ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource) -{ - std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); - - if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) - { - return 0; - } - - ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName); - - std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); - - uint64_t MigratedChunkCount = 0; - uint32_t MigratedBlockCount = 0; - Stopwatch MigrationTimer; - uint64_t TotalSize = 0; - const auto _ = MakeGuard([&] { - ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", - m_BucketDir / m_BucketName, - MigratedChunkCount, - MigratedBlockCount, - NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), - NiceBytes(TotalSize)); - }); - - uint64_t BlockFileSize = 0; - { - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - BlockFileSize = BlockFile.FileSize(); - } - - std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; - uint64_t InvalidEntryCount = 0; - - size_t BlockChunkCount = 0; - TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog; - LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); - { - Stopwatch Timer; - const auto __ = MakeGuard([&] { - ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", - LegacyLogPath, - LegacyDiskIndex.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - if (LegacyCasLog.Initialize()) - { - LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); - LegacyCasLog.Replay( - [&](const LegacyDiskIndexEntry& Record) { - if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone)) - { - LegacyDiskIndex.erase(Record.Key); - return; - } - std::string InvalidEntryReason; - if (!ValidateLegacyEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - if (m_Index.contains(Record.Key)) - { - return; - } - LegacyDiskIndex[Record.Key] = Record; - }, - 0); - - std::vector<IoHash> BadEntries; - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyDiskIndexEntry& Record(Entry.second); - if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - continue; - } - if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize) - { - BlockChunkCount++; - continue; - } - ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); - BadEntries.push_back(Entry.first); - } - for (const IoHash& BadHash : BadEntries) - { - LegacyDiskIndex.erase(BadHash); - } - InvalidEntryCount += BadEntries.size(); - } - } - if (InvalidEntryCount) - { - ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName); - } - - if (LegacyDiskIndex.empty()) - { - LegacyCasLog.Close(); - if (CleanSource) - { - // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find - // a manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); - } - return 0; - } - - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - CreateDirectories(LogPath.parent_path()); - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - - std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; - std::vector<BlockStoreLocation> ChunkLocations; - ChunkIndexToChunkHash.reserve(BlockChunkCount); - ChunkLocations.reserve(BlockChunkCount); - - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount); - - for (const auto& Entry : LegacyDiskIndex) - { - const IoHash& ChunkHash = Entry.first; - const LegacyDiskLocation& Location = Entry.second.Location; - if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile)) - { - uint8_t Flags = 0xff & (Location.Flags() >> 56); - DiskLocation NewLocation = DiskLocation(Location.Size(), Flags); - LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation}); - continue; - } - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()}); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - TotalSize += Location.Size(); - } - for (const DiskIndexEntry& Entry : LogEntries) - { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - } - CasLog.Append(LogEntries); - - m_BlockStore.Split( - ChunkLocations, - LegacyDataPath, - m_BlocksBasePath, - MaxBlockSize, - BlockStoreDiskLocation::MaxBlockIndex + 1, - m_PayloadAlignment, - CleanSource, - [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( - const BlockStore::MovedChunksArray& MovedChunks) { - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; - const LegacyDiskLocation& OldLocation = OldEntry.Location; - uint8_t Flags = 0xff & (OldLocation.Flags() >> 56); - LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)}); - } - for (const DiskIndexEntry& Entry : LogEntries) - { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - } - CasLog.Append(LogEntries); - CasLog.Flush(); - if (CleanSource) - { - std::vector<LegacyDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(MovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; - const LegacyDiskLocation& OldLocation = OldEntry.Location; - LegacyDiskLocation NewLocation(OldLocation.Offset(), - OldLocation.Size(), - 0, - OldLocation.Flags() | LegacyDiskLocation::kTombStone); - LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation}); - } - LegacyCasLog.Append(LegacyLogEntries); - LegacyCasLog.Flush(); - } - MigratedBlockCount++; - MigratedChunkCount += MovedChunks.size(); - }); - - LegacyCasLog.Close(); - CasLog.Close(); - - if (CleanSource) - { - // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find - // a manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); - } - return MigratedChunkCount; -} - void ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew) { @@ -1123,23 +824,18 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is m_Index.clear(); - std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir); - std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); - std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); + std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName); + std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); if (IsNew) { - std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir); - fs::remove(LegacyLogPath); - fs::remove(LegacyDataPath); fs::remove(LogPath); fs::remove(IndexPath); fs::remove_all(m_BlocksBasePath); } - uint64_t LogPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(LogPosition); - uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); CreateDirectories(m_BucketDir); @@ -1161,7 +857,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0)) + if (IsNew || LogEntryCount > 0) { MakeIndexSnapshot(); } @@ -1309,6 +1005,7 @@ void ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { std::vector<IoHash> BadKeys; + uint64_t ChunkCount{0}, ChunkBytes{0}; std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; @@ -1341,6 +1038,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { + ++ChunkCount; + ChunkBytes += Loc.Size(); if (Loc.GetContentType() == ZenContentType::kBinary) { ExtendablePathBuilder<256> DataFilePath; @@ -1381,6 +1080,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { @@ -1403,8 +1104,11 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); if (!Buffer) { BadKeys.push_back(Hash); @@ -1422,40 +1126,41 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) _.ReleaseNow(); - if (BadKeys.empty()) - { - return; - } + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); - - if (Ctx.RunRecovery()) + if (!BadKeys.empty()) { - // Deal with bad chunks by removing them from our lookup map - - std::vector<DiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); + ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); + if (Ctx.RunRecovery()) { - RwLock::ExclusiveLockScope __(m_IndexLock); - for (const IoHash& BadKey : BadKeys) - { - // Log a tombstone and delete the in-memory index for the bad entry + // Deal with bad chunks by removing them from our lookup map - const auto It = m_Index.find(BadKey); - DiskLocation Location = It->second.Location; - Location.Flags |= DiskLocation::kTombStone; - LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); - m_Index.erase(BadKey); + std::vector<DiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); + + { + RwLock::ExclusiveLockScope __(m_IndexLock); + for (const IoHash& BadKey : BadKeys) + { + // Log a tombstone and delete the in-memory index for the bad entry + const auto It = m_Index.find(BadKey); + DiskLocation Location = It->second.Location; + Location.Flags |= DiskLocation::kTombStone; + LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location}); + m_Index.erase(BadKey); + } } + m_SlogFile.Append(LogEntries); } - m_SlogFile.Append(LogEntries); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadCasChunks(BadKeys); + Ctx.ReportBadCidChunks(BadKeys); + + ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); } void @@ -1517,7 +1222,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { if (Cids.size() > 1024) { - GcCtx.ContributeCids(Cids); + GcCtx.AddRetainedCids(Cids); Cids.clear(); } @@ -1552,8 +1257,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) } } - GcCtx.ContributeCids(Cids); - GcCtx.ContributeCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); + GcCtx.AddRetainedCids(Cids); + GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys)); } void @@ -1601,7 +1306,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string()); std::vector<IoHash> DeleteCacheKeys; DeleteCacheKeys.reserve(ExpiredCacheKeys.size()); - GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { + GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) { if (Keep) { return; @@ -1752,7 +1457,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); - GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = Index.find(ChunkHash); const DiskLocation& DiskLocation = KeyIt->second.Location; BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment); @@ -1836,7 +1541,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) }, [&]() { return GcCtx.CollectSmallObjects(); }); - GcCtx.DeletedCas(DeletedChunks); + GcCtx.AddDeletedCids(DeletedChunks); } void @@ -2302,7 +2007,7 @@ ZenCacheDiskLayer::TotalSize() const static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; -ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration) +ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration) : GcStorage(Gc) , GcContributor(Gc) , m_Gc(Gc) @@ -2313,7 +2018,6 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration) DirectoryContent DirContent; GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent); - std::vector<std::string> LegacyBuckets; std::vector<std::string> Namespaces; for (const std::filesystem::path& DirPath : DirContent.Directories) { @@ -2323,33 +2027,17 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration) Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); continue; } - LegacyBuckets.push_back(DirName); } - ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), m_Configuration.BasePath, LegacyBuckets.size()); + ZEN_INFO("Found #{} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath); if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) { // default (unspecified) and ue4-ddc namespace points to the same namespace instance - ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName); - std::filesystem::path DefaultNamespaceFolder = m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); CreateDirectories(DefaultNamespaceFolder); - - // Move any non-namespace folders into the default namespace folder - for (const std::string& DirName : LegacyBuckets) - { - std::filesystem::path LegacyFolder = m_Configuration.BasePath / DirName; - std::filesystem::path NewPath = DefaultNamespaceFolder / DirName; - std::error_code Ec; - std::filesystem::rename(LegacyFolder, NewPath, Ec); - if (Ec) - { - ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message()); - } - } Namespaces.push_back(std::string(UE4DDCNamespaceName)); } @@ -2537,7 +2225,7 @@ TEST_CASE("z$.store") { ScopedTemporaryDirectory TempDir; - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); @@ -2592,7 +2280,7 @@ TEST_CASE("z$.size") GcStorageSize CacheSize; { - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256); @@ -2612,7 +2300,7 @@ TEST_CASE("z$.size") } { - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); @@ -2635,7 +2323,7 @@ TEST_CASE("z$.size") GcStorageSize CacheSize; { - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64); @@ -2655,7 +2343,7 @@ TEST_CASE("z$.size") } { - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const GcStorageSize SerializedSize = Zcs.StorageSize(); @@ -2680,7 +2368,7 @@ TEST_CASE("z$.gc") ScopedTemporaryDirectory TempDir; std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; - const auto CollectAndFilter = [](CasGc& Gc, + const auto CollectAndFilter = [](GcManager& Gc, GcClock::TimePoint Time, GcClock::Duration MaxDuration, std::span<const IoHash> Cids, @@ -2693,7 +2381,7 @@ TEST_CASE("z$.gc") }; { - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "teardrinker"sv; @@ -2730,7 +2418,7 @@ TEST_CASE("z$.gc") // Expect timestamps to be serialized { - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); std::vector<IoHash> Keep; @@ -2751,7 +2439,7 @@ TEST_CASE("z$.gc") SUBCASE("gc removes standalone values") { ScopedTemporaryDirectory TempDir; - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "fortysixandtwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -2799,7 +2487,7 @@ TEST_CASE("z$.gc") SUBCASE("gc removes small objects") { ScopedTemporaryDirectory TempDir; - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); const auto Bucket = "rightintwo"sv; const GcClock::TimePoint CurrentTime = GcClock::Now(); @@ -2848,154 +2536,6 @@ TEST_CASE("z$.gc") } } -TEST_CASE("z$.legacyconversion") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[] = {2041, - 1123, - 1223, - 1239, - 341, - 1412, - 912, - 774, - 341, - 431, - 554, - 1098, - 2048, - 339 + 64 * 1024, - 561 + 64 * 1024, - 16 + 64 * 1024, - 16 + 64 * 1024, - 2048, - 2048}; - size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); - size_t SingleBlockSize = 0; - std::vector<IoBuffer> Chunks; - Chunks.reserve(ChunkCount); - for (uint64_t Size : ChunkSizes) - { - Chunks.push_back(testutils::CreateBinaryCacheValue(Size)); - SingleBlockSize += Size; - } - - ZEN_UNUSED(SingleBlockSize); - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(ChunkCount); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - CreateDirectories(TempDir.Path()); - - const std::string Bucket = "rightintwo"; - { - CasGc Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path()); - const GcClock::TimePoint CurrentTime = GcClock::Now(); - - for (size_t i = 0; i < ChunkCount; i++) - { - Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]}); - } - - std::vector<IoHash> KeepChunks; - for (size_t i = 0; i < ChunkCount; i += 2) - { - KeepChunks.push_back(ChunkHashes[i]); - } - GcContext GcCtx(CurrentTime + std::chrono::hours(2)); - GcCtx.MaxCacheDuration(std::chrono::minutes(2)); - GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepChunks); - Zcs.Flush(); - Gc.CollectGarbage(GcCtx); - } - std::filesystem::path BucketDir = TempDir.Path() / Bucket; - std::filesystem::path BlocksBaseDir = BucketDir / "blocks"; - - std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1); - std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir); - std::filesystem::remove(LegacyDataPath); - std::filesystem::rename(CasPath, LegacyDataPath); - - std::vector<DiskIndexEntry> LogEntries; - std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket); - if (std::filesystem::is_regular_file(IndexPath)) - { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion && - Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) - { - LogEntries.resize(Header.EntryCount); - ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - } - } - ObjectIndexFile.Close(); - std::filesystem::remove(IndexPath); - } - - std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket); - { - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - LogEntries.reserve(CasLog.GetLogCount()); - CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); - } - TCasLogFile<LegacyDiskIndexEntry> LegacyLog; - std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir); - LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); - - for (const DiskIndexEntry& Entry : LogEntries) - { - uint64_t Size; - uint64_t Offset; - if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - Size = Entry.Location.Location.StandaloneSize; - Offset = 0; - } - else - { - BlockStoreLocation Location = Entry.Location.GetBlockLocation(16); - Size = Location.Size; - Offset = Location.Offset; - } - LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56); - LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation}; - LegacyLog.Append(LegacyEntry); - } - LegacyLog.Close(); - - std::filesystem::remove_all(BlocksBaseDir); - std::filesystem::remove(LogPath); - std::filesystem::remove(IndexPath); - - { - CasGc Gc; - ZenCacheNamespace Zcs(Gc, TempDir.Path()); - - for (size_t i = 0; i < ChunkCount; i += 2) - { - ZenCacheValue Value; - CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value)); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value)); - CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value)); - } - } -} - TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) { // for (uint32_t i = 0; i < 100; ++i) @@ -3045,7 +2585,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) CreateDirectories(TempDir.Path()); WorkerThreadPool ThreadPool(4); - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path()); { @@ -3169,10 +2709,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) GcContext GcCtx; GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); + GcCtx.AddRetainedCids(KeepHashes); Zcs.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } while (WorkCompleted < NewChunks.size() + Chunks.size()) @@ -3217,10 +2757,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true)) GcContext GcCtx; GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); + GcCtx.AddRetainedCids(KeepHashes); Zcs.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } } { @@ -3261,7 +2801,7 @@ TEST_CASE("z$.namespaces") IoHash Key1; IoHash Key2; { - CasGc Gc; + GcManager Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; @@ -3286,7 +2826,7 @@ TEST_CASE("z$.namespaces") } { - CasGc Gc; + GcManager Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; @@ -3346,7 +2886,7 @@ TEST_CASE("z$.drop.bucket") }; WorkerThreadPool Workers(1); { - CasGc Gc; + GcManager Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); const auto Bucket = "teardrinker"sv; const auto Namespace = "mynamespace"sv; @@ -3415,7 +2955,7 @@ TEST_CASE("z$.drop.namespace") }; WorkerThreadPool Workers(1); { - CasGc Gc; + GcManager Gc; ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); const auto Bucket1 = "teardrinker1"sv; const auto Bucket2 = "teardrinker2"sv; @@ -3480,7 +3020,7 @@ TEST_CASE("z$.blocked.disklayer.put") return Writer.Save(); }; - CasGc Gc; + GcManager Gc; ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); CbObject CacheValue = CreateCacheValue(64 * 1024 + 64); @@ -3517,6 +3057,96 @@ TEST_CASE("z$.blocked.disklayer.put") CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0); } +TEST_CASE("z$.scrub") +{ + ScopedTemporaryDirectory TempDir; + + using namespace testutils; + + struct CacheRecord + { + IoBuffer Record; + std::vector<CompressedBuffer> Attachments; + }; + + auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) { + CacheRecord Result; + if (Structured) + { + Result.Attachments.resize(AttachmentSizes.size()); + CbObjectWriter Record; + Record.BeginObject("Key"sv); + { + Record << "Bucket"sv << Bucket; + Record << "Hash"sv << Key; + } + Record.EndObject(); + for (size_t Index = 0; Index < AttachmentSizes.size(); Index++) + { + IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]); + CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData)); + Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash())); + Result.Attachments[Index] = CompressedAttachmentData; + } + Result.Record = Record.Save().GetBuffer().AsIoBuffer(); + Result.Record.SetContentType(ZenContentType::kCbObject); + } + else + { + std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString()); + size_t TotalSize = RecordData.length() + 1; + for (size_t AttachmentSize : AttachmentSizes) + { + TotalSize += AttachmentSize; + } + Result.Record = IoBuffer(TotalSize); + char* DataPtr = (char*)Result.Record.MutableData(); + memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1); + DataPtr += RecordData.length() + 1; + for (size_t AttachmentSize : AttachmentSizes) + { + IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize); + memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize()); + DataPtr += AttachmentData.GetSize(); + } + } + return Result; + }; + + GcManager Gc; + CidStore CidStore(Gc); + ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache"); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + auto CreateRecords = + [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) { + for (const IoHash& Cid : Cids) + { + CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes); + Zcs.Put("mybucket", Cid, {.Value = Record.Record}); + for (const CompressedBuffer& Attachment : Record.Attachments) + { + CidStore.AddChunk(Attachment); + } + } + }; + + std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000}; + + std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)}; + CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes); + + std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)}; + CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes); + + ScrubContext ScrubCtx; + Zcs.Scrub(ScrubCtx); + CidStore.Scrub(ScrubCtx); + CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); + CHECK(ScrubCtx.BadCids().GetSize() == 0); +} + #endif void diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index ea33a3c00..b81e44835 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -9,7 +9,6 @@ #include <zencore/uid.h> #include <zenstore/basicfile.h> #include <zenstore/blockstore.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> @@ -27,8 +26,9 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { class PathBuilderBase; -class CasGc; +class GcManager; class ZenCacheTracker; +class ScrubContext; /****************************************************************************** @@ -327,7 +327,7 @@ private: class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor { public: - ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir); + ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir); ~ZenCacheNamespace(); bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -369,7 +369,7 @@ public: bool AllowAutomaticCreationOfNamespaces = true; }; - ZenCacheStore(CasGc& Gc, const Configuration& Configuration); + ZenCacheStore(GcManager& Gc, const Configuration& Configuration); ~ZenCacheStore(); bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); @@ -393,7 +393,7 @@ private: NamespaceMap m_Namespaces; std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; - CasGc& m_Gc; + GcManager& m_Gc; Configuration m_Configuration; }; diff --git a/zenserver/casstore.cpp b/zenserver/cidstore.cpp index 872a40df8..5de347a17 100644 --- a/zenserver/casstore.cpp +++ b/zenserver/cidstore.cpp @@ -1,23 +1,25 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "casstore.h" +#include "cidstore.h" +#include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zenstore/cidstore.h> #include <gsl/gsl-lite.hpp> namespace zen { -HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store) +HttpCidService::HttpCidService(CidStore& Store) : m_CidStore(Store) { - m_Router.AddPattern("cas", "([0-9A-Fa-f]{40})"); + m_Router.AddPattern("cid", "([0-9A-Fa-f]{40})"); m_Router.RegisterRoute( - "{cas}", + "{cid}", [this](HttpRouterRequest& Req) { IoHash Hash = IoHash::FromHexString(Req.GetCapture(1)); - ZEN_DEBUG("CAS request for {}", Hash); + ZEN_DEBUG("CID request for {}", Hash); HttpServerRequest& ServerRequest = Req.ServerRequest(); @@ -26,7 +28,7 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store) case HttpVerb::kGet: case HttpVerb::kHead: { - if (IoBuffer Value = m_CasStore.FindChunk(Hash)) + if (IoBuffer Value = m_CidStore.FindChunkByCid(Hash)) { return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } @@ -37,8 +39,14 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store) case HttpVerb::kPut: { - IoBuffer Payload = ServerRequest.ReadPayload(); - IoHash PayloadHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + IoBuffer Payload = ServerRequest.ReadPayload(); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + if (!Compressed) + { + return ServerRequest.WriteResponse(HttpResponseCode::UnsupportedMediaType); + } + + IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); // URI hash must match content hash if (PayloadHash != Hash) @@ -46,7 +54,7 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store) return ServerRequest.WriteResponse(HttpResponseCode::BadRequest); } - m_CasStore.InsertChunk(Payload, PayloadHash); + m_CidStore.AddChunk(Compressed); return ServerRequest.WriteResponse(HttpResponseCode::OK); } @@ -60,13 +68,13 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store) } const char* -HttpCasService::BaseUri() const +HttpCidService::BaseUri() const { - return "/cas/"; + return "/cid/"; } void -HttpCasService::HandleRequest(zen::HttpServerRequest& Request) +HttpCidService::HandleRequest(zen::HttpServerRequest& Request) { if (Request.RelativeUri().empty()) { @@ -77,12 +85,18 @@ HttpCasService::HandleRequest(zen::HttpServerRequest& Request) case HttpVerb::kPut: case HttpVerb::kPost: { - IoBuffer Payload = Request.ReadPayload(); - IoHash PayloadHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + IoBuffer Payload = Request.ReadPayload(); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + if (!Compressed) + { + return Request.WriteResponse(HttpResponseCode::UnsupportedMediaType); + } + + IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); - ZEN_DEBUG("CAS POST request for {} ({} bytes)", PayloadHash, Payload.Size()); + ZEN_DEBUG("CID POST request for {} ({} bytes)", PayloadHash, Payload.Size()); - auto InsertResult = m_CasStore.InsertChunk(Payload, PayloadHash); + auto InsertResult = m_CidStore.AddChunk(Compressed); if (InsertResult.New) { diff --git a/zenserver/casstore.h b/zenserver/cidstore.h index 4ca6908b5..8e7832b35 100644 --- a/zenserver/casstore.h +++ b/zenserver/cidstore.h @@ -3,31 +3,32 @@ #pragma once #include <zenhttp/httpserver.h> -#include <zenstore/cas.h> namespace zen { /** - * Simple CAS store HTTP endpoint + * Simple CID store HTTP endpoint * * Note that since this does not end up pinning any of the chunks it's only really useful for a small subset of use cases where you know a - * chunk exists in the underlying CAS store. Thus it's mainly useful for internal use when communicating between Zen store instances + * chunk exists in the underlying CID store. Thus it's mainly useful for internal use when communicating between Zen store instances * - * Using this interface for adding CAS chunks makes little sense except for testing purposes as garbage collection may reap anything you add + * Using this interface for adding CID chunks makes little sense except for testing purposes as garbage collection may reap anything you add * before anything ever gets to access it */ -class HttpCasService : public HttpService +class CidStore; + +class HttpCidService : public HttpService { public: - explicit HttpCasService(CasStore& Store); - ~HttpCasService() = default; + explicit HttpCidService(CidStore& Store); + ~HttpCidService() = default; virtual const char* BaseUri() const override; virtual void HandleRequest(zen::HttpServerRequest& Request) override; private: - CasStore& m_CasStore; + CidStore& m_CidStore; HttpRequestRouter m_Router; }; diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp index 171c67a6e..d7316ac64 100644 --- a/zenserver/compute/function.cpp +++ b/zenserver/compute/function.cpp @@ -17,7 +17,6 @@ # include <zencore/iobuffer.h> # include <zencore/iohash.h> # include <zencore/scopeguard.h> -# include <zenstore/cas.h> # include <zenstore/cidstore.h> # include <span> @@ -26,25 +25,22 @@ using namespace std::literals; namespace zen { -HttpFunctionService::HttpFunctionService(CasStore& Store, - CidStore& InCidStore, +HttpFunctionService::HttpFunctionService(CidStore& InCidStore, const CloudCacheClientOptions& ComputeOptions, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& ComputeAuthConfig, const UpstreamAuthConfig& StorageAuthConfig, AuthMgr& Mgr) : m_Log(logging::Get("apply")) -, m_CasStore(Store) , m_CidStore(InCidStore) { - m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); + m_UpstreamApply = UpstreamApply::Create({}, m_CidStore); InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] { auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, ComputeAuthConfig, StorageOptions, StorageAuthConfig, - m_CasStore, m_CidStore, Mgr); m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); @@ -99,18 +95,18 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, // Determine which pieces are missing and need to be transmitted to populate CAS - CasChunkSet ChunkSet; + HashKeySet ChunkSet; FunctionSpec.IterateAttachments([&](CbFieldView Field) { const IoHash Hash = Field.AsHash(); - ChunkSet.AddChunkToSet(Hash); + ChunkSet.AddHashToSet(Hash); }); // Note that we store executables uncompressed to make it // more straightforward and efficient to materialize them, hence // the CAS lookup here instead of CID for the input payloads - m_CasStore.FilterChunks(ChunkSet); + m_CidStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { @@ -127,7 +123,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("need"); - ChunkSet.IterateChunks([&](const IoHash& Hash) { + ChunkSet.IterateHashes([&](const IoHash& Hash) { ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); @@ -159,25 +155,18 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, { ZEN_ASSERT(Attachment.IsCompressedBinary()); - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - SharedBuffer Decompressed = DataView.Decompress(); - const uint64_t DecompressedSize = DataView.GetRawSize(); + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer Buffer = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); - - TotalAttachmentBytes += DecompressedSize; + TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; - // Note that we store executables uncompressed to make it - // more straightforward and efficient to materialize them - - const CasStore::InsertResult InsertResult = - m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); + const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer); if (InsertResult.New) { - TotalNewBytes += DecompressedSize; + TotalNewBytes += Buffer.GetCompressedSize(); ++NewAttachmentCount; } } diff --git a/zenserver/compute/function.h b/zenserver/compute/function.h index efabe96ee..650cee757 100644 --- a/zenserver/compute/function.h +++ b/zenserver/compute/function.h @@ -20,7 +20,6 @@ namespace zen { -class CasStore; class CidStore; class UpstreamApply; class CloudCacheClient; @@ -35,8 +34,7 @@ struct CloudCacheClientOptions; class HttpFunctionService : public HttpService { public: - HttpFunctionService(CasStore& Store, - CidStore& InCidStore, + HttpFunctionService(CidStore& InCidStore, const CloudCacheClientOptions& ComputeOptions, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& ComputeAuthConfig, @@ -52,7 +50,6 @@ private: spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; HttpRequestRouter m_Router; - CasStore& m_CasStore; CidStore& m_CidStore; std::unique_ptr<UpstreamApply> m_UpstreamApply; diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 1853941ed..e42704ccf 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -16,8 +16,8 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zenstore/basicfile.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> +#include <zenstore/scrubcontext.h> #include "config.h" @@ -350,7 +350,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) Hashes.push_back(Kv.second); } - GcCtx.ContributeCids(Hashes); + GcCtx.AddRetainedCids(Hashes); Hashes.clear(); @@ -359,7 +359,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx) Hashes.push_back(Kv.second); } - GcCtx.ContributeCids(Hashes); + GcCtx.AddRetainedCids(Hashes); } bool @@ -872,7 +872,7 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx) ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) : GcContributor(Gc) , m_Log(zen::logging::Get("project")) , m_CidStore(Store) @@ -1482,7 +1482,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { const IoHash FileHash = Entry.AsHash(); - if (!m_CidStore.FindChunkByCid(FileHash)) + if (!m_CidStore.ContainsChunk(FileHash)) { ZEN_DEBUG("prep - NEED: {}", FileHash); diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index 6a8730ee2..a15556cfa 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -6,8 +6,6 @@ #include <zencore/uid.h> #include <zencore/xxhash.h> #include <zenhttp/httpserver.h> -#include <zenstore/cas.h> -#include <zenstore/caslog.h> #include <zenstore/cidstore.h> #include <zenstore/gc.h> @@ -65,7 +63,7 @@ class ProjectStore : public RefCounted, public GcContributor struct OplogStorage; public: - ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc); + ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc); ~ProjectStore(); struct Project; diff --git a/zenserver/testing/launch.cpp b/zenserver/testing/launch.cpp index 1236e6adb..0e46fff94 100644 --- a/zenserver/testing/launch.cpp +++ b/zenserver/testing/launch.cpp @@ -6,13 +6,14 @@ # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> +# include <zencore/compress.h> # include <zencore/filesystem.h> # include <zencore/fmtutils.h> # include <zencore/iobuffer.h> # include <zencore/iohash.h> # include <zencore/logging.h> # include <zencore/windows.h> -# include <zenstore/cas.h> +# include <zenstore/cidstore.h> ZEN_THIRD_PARTY_INCLUDES_START # include <AccCtrl.h> @@ -322,9 +323,9 @@ SandboxedJob::SpawnJob(std::filesystem::path ExePath) //////////////////////////////////////////////////////////////////////////////// -HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::path& SandboxBaseDir) +HttpLaunchService::HttpLaunchService(CidStore& Store, const std::filesystem::path& SandboxBaseDir) : m_Log(logging::Get("exec")) -, m_CasStore(Store) +, m_CidStore(Store) , m_SandboxPath(SandboxBaseDir) { m_Router.AddPattern("job", "([[:digit:]]+)"); @@ -402,7 +403,7 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat const IoHash FileHash = Ob["hash"sv].AsHash(); - if (!m_CasStore.FindChunk(FileHash)) + if (!m_CidStore.FindChunkByCid(FileHash)) { ZEN_DEBUG("NEED: {} {} {}", FileHash, Ob["file"sv].AsString(), Ob["size"sv].AsUInt64()); @@ -465,7 +466,7 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat const IoHash FileHash = Ob["hash"sv].AsHash(); uint64_t FileSize = Ob["size"sv].AsUInt64(); - if (IoBuffer Chunk = m_CasStore.FindChunk(FileHash); !Chunk) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(FileHash); !Chunk) { ZEN_DEBUG("MISSING: {} {} {}", FileHash, FileName, FileSize); AllOk = false; @@ -476,9 +477,18 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat { std::filesystem::path FullPath = SandboxDir / FileName; - const IoBuffer* Chunks[] = {&Chunk}; - - zen::WriteFile(FullPath, Chunks, 1); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + CompositeBuffer CompositeBuffer = Compressed.DecompressToComposite(); + std::span<const SharedBuffer> Segments = CompositeBuffer.GetSegments(); + std::vector<IoBuffer> Chunks(Segments.size()); + std::vector<IoBuffer*> ChunkPtrs(Segments.size()); + for (size_t Index = 0; Index < Segments.size(); ++Index) + { + Chunks[Index] = std::move(Segments[Index].AsIoBuffer()); + ChunkPtrs[Index] = &Chunks[Index]; + } + + zen::WriteFile(FullPath, ChunkPtrs.data(), ChunkPtrs.size()); } } diff --git a/zenserver/testing/launch.h b/zenserver/testing/launch.h index 6fd3e39ae..f44618bfb 100644 --- a/zenserver/testing/launch.h +++ b/zenserver/testing/launch.h @@ -17,7 +17,7 @@ namespace zen { -class CasStore; +class CidStore; /** * Process launcher for test executables @@ -25,7 +25,7 @@ class CasStore; class HttpLaunchService : public HttpService { public: - HttpLaunchService(CasStore& Store, const std::filesystem::path& SandboxBaseDir); + HttpLaunchService(CidStore& Store, const std::filesystem::path& SandboxBaseDir); ~HttpLaunchService(); virtual const char* BaseUri() const override; @@ -36,7 +36,7 @@ private: spdlog::logger& m_Log; HttpRequestRouter m_Router; - CasStore& m_CasStore; + CidStore& m_CidStore; std::filesystem::path m_SandboxPath; std::atomic<int> m_SandboxCount{0}; diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index 38c798569..22b06d9c4 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -17,7 +17,6 @@ # include <zencore/timer.h> # include <zencore/workthreadpool.h> -# include <zenstore/cas.h> # include <zenstore/cidstore.h> # include <auth/authmgr.h> @@ -49,11 +48,9 @@ namespace detail { const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, - CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) - , m_CasStore(CasStore) , m_CidStore(CidStore) , m_AuthMgr(Mgr) { @@ -341,7 +338,7 @@ namespace detail { } else { - DataBuffer = m_CasStore.FindChunk(Hash); + DataBuffer = m_CidStore.FindChunkByCid(Hash); if (!DataBuffer) { Log().warn("Put blob FAILED, input chunk '{}' missing", Hash); @@ -676,7 +673,6 @@ namespace detail { spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - CasStore& m_CasStore; CidStore& m_CidStore; AuthMgr& m_AuthMgr; std::string m_DisplayName; @@ -1218,7 +1214,7 @@ namespace detail { const IoHash ChunkId = FileEntry["hash"sv].AsHash(); const uint64_t Size = FileEntry["size"sv].AsUInt64(); - if (!m_CasStore.ContainsChunk(ChunkId)) + if (!m_CidStore.ContainsChunk(ChunkId)) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); return false; @@ -1443,7 +1439,6 @@ UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& Comput const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, - CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr) { @@ -1451,11 +1446,10 @@ UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& Comput ComputeAuthConfig, StorageOptions, StorageAuthConfig, - CasStore, CidStore, Mgr); } } // namespace zen -#endif // ZEN_WITH_COMPUTE_SERVICES
\ No newline at end of file +#endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index c397bb141..c719b225d 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -11,7 +11,6 @@ # include <zencore/timer.h> # include <zencore/workthreadpool.h> -# include <zenstore/cas.h> # include <zenstore/cidstore.h> # include "diag/logging.h" @@ -77,10 +76,9 @@ struct UpstreamApplyStats class UpstreamApplyImpl final : public UpstreamApply { public: - UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) + UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore) : m_Log(logging::Get("upstream-apply")) , m_Options(Options) - , m_CasStore(CasStore) , m_CidStore(CidStore) , m_Stats(Options.StatsEnabled) , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount) @@ -429,7 +427,6 @@ private: spdlog::logger& m_Log; UpstreamApplyOptions m_Options; - CasStore& m_CasStore; CidStore& m_CidStore; UpstreamApplyStats m_Stats; UpstreamApplyTasks m_ApplyTasks; @@ -452,9 +449,9 @@ UpstreamApply::IsHealthy() const } std::unique_ptr<UpstreamApply> -UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) +UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore) { - return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore); + return std::make_unique<UpstreamApplyImpl>(Options, CidStore); } } // namespace zen diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h index 1deaf00a5..4a095be6c 100644 --- a/zenserver/upstream/upstreamapply.h +++ b/zenserver/upstream/upstreamapply.h @@ -18,7 +18,6 @@ namespace zen { class AuthMgr; -class CasStore; class CbObjectWriter; class CidStore; class CloudCacheTokenProvider; @@ -153,7 +152,6 @@ public: const UpstreamAuthConfig& ComputeAuthConfig, const CloudCacheClientOptions& StorageOptions, const UpstreamAuthConfig& StorageAuthConfig, - CasStore& CasStore, CidStore& CidStore, AuthMgr& Mgr); }; @@ -186,7 +184,7 @@ public: virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; - static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); + static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CidStore& CidStore); }; } // namespace zen diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 98b4439c7..7d1f72004 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -15,7 +15,6 @@ #include <zencore/timer.h> #include <zencore/trace.h> -#include <zenstore/cas.h> #include <zenstore/cidstore.h> #include <auth/authmgr.h> diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 801a98523..31ca8851a 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -17,8 +17,8 @@ #include <zenhttp/httpserver.h> #include <zenhttp/websocket.h> #include <zenstore/basicfile.h> -#include <zenstore/cas.h> #include <zenstore/cidstore.h> +#include <zenstore/scrubcontext.h> #include <zenutil/zenserverprocess.h> #if ZEN_PLATFORM_WINDOWS @@ -56,7 +56,6 @@ ZEN_THIRD_PARTY_INCLUDES_END ////////////////////////////////////////////////////////////////////////// -#include "casstore.h" #include "config.h" #include "diag/logging.h" @@ -103,6 +102,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "auth/authservice.h" #include "cache/structuredcache.h" #include "cache/structuredcachestore.h" +#include "cidstore.h" #include "compute/function.h" #include "diag/diagsvcs.h" #include "experimental/usnjournal.h" @@ -253,17 +253,16 @@ public: ZEN_INFO("initializing storage"); - zen::CasStoreConfiguration Config; + zen::CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; - m_CasStore->Initialize(Config); - - m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid"); - m_CasGc.SetCidStore(m_CidStore.get()); + m_CidStore = std::make_unique<zen::CidStore>(m_GcManager); + m_CidStore->Initialize(Config); + m_CidService.reset(new zen::HttpCidService{*m_CidStore}); ZEN_INFO("instantiating project service"); - m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_CasGc); + m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager); m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); #if ZEN_WITH_EXEC_SERVICES @@ -274,7 +273,7 @@ public: std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; zen::CreateDirectories(SandboxDir); - m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); + m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CidStore, SandboxDir); } else { @@ -328,7 +327,7 @@ public: m_Http->RegisterService(*m_HttpProjectService); } - m_Http->RegisterService(m_CasService); + m_Http->RegisterService(*m_CidService); #if ZEN_WITH_EXEC_SERVICES if (ServerOptions.ExecServiceEnabled) @@ -522,7 +521,6 @@ public: ZEN_INFO("Storage validation STARTING"); ScrubContext Ctx; - m_CasStore->Scrub(Ctx); m_CidStore->Scrub(Ctx); m_ProjectStore->Scrub(Ctx); m_StructuredCacheService->Scrub(Ctx); @@ -538,9 +536,6 @@ public: void Flush() { - if (m_CasStore) - m_CasStore->Flush(); - if (m_CidStore) m_CidStore->Flush(); @@ -602,14 +597,13 @@ private: std::unique_ptr<zen::HttpAuthService> m_AuthService; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; - zen::CasGc m_CasGc; - zen::GcScheduler m_GcScheduler{m_CasGc}; - std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_CasGc)}; + zen::GcManager m_GcManager; + zen::GcScheduler m_GcScheduler{m_GcManager}; std::unique_ptr<zen::CidStore> m_CidStore; std::unique_ptr<zen::ZenCacheStore> m_CacheStore; zen::HttpTestService m_TestService; zen::HttpTestingService m_TestingService; - zen::HttpCasService m_CasService{*m_CasStore}; + std::unique_ptr<zen::HttpCidService> m_CidService; zen::RefPtr<zen::ProjectStore> m_ProjectStore; std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; std::unique_ptr<zen::UpstreamCache> m_UpstreamCache; @@ -745,7 +739,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) ZEN_INFO("instantiating structured cache service"); m_CacheStore = std::make_unique<ZenCacheStore>( - m_CasGc, + m_GcManager, ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true}); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; @@ -873,8 +867,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider, .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken}; - m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, - *m_CidStore, + m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CidStore, ComputeOptions, StorageOptions, ComputeAuthConfig, diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp index 0e1d5b242..54e8cb11c 100644 --- a/zenstore/cas.cpp +++ b/zenstore/cas.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenstore/cas.h> +#include "cas.h" #include "compactcas.h" #include "filecas.h" @@ -18,7 +18,9 @@ #include <zencore/thread.h> #include <zencore/trace.h> #include <zencore/uid.h> +#include <zenstore/cidstore.h> #include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> #include <gsl/gsl-lite.hpp> @@ -30,58 +32,6 @@ namespace zen { -void -CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) -{ - m_ChunkSet.insert(HashToAdd); -} - -void -CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd) -{ - m_ChunkSet.insert(HashesToAdd.begin(), HashesToAdd.end()); -} - -void -CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) -{ - for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) - { - if (Predicate(*It)) - { - It = m_ChunkSet.erase(It); - } - else - { - ++It; - } - } -} - -void -CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback) -{ - for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It) - { - Callback(*It); - } -} - -////////////////////////////////////////////////////////////////////////// - -void -ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) -{ - m_BadCas.AddChunksToSet(BadCasChunks); -} - -void -ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) -{ - m_ChunkCount.fetch_add(ChunkCount); - m_ByteCount.fetch_add(ChunkBytes); -} - /** * CAS store implementation * @@ -93,18 +43,18 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) class CasImpl : public CasStore { public: - CasImpl(CasGc& Gc); + CasImpl(GcManager& Gc); virtual ~CasImpl(); - virtual void Initialize(const CasStoreConfiguration& InConfig) override; + virtual void Initialize(const CidStoreConfiguration& InConfig) override; virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override; virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; virtual bool ContainsChunk(const IoHash& ChunkHash) override; - virtual void FilterChunks(CasChunkSet& InOutChunks) override; + virtual void FilterChunks(HashKeySet& InOutChunks) override; virtual void Flush() override; virtual void Scrub(ScrubContext& Ctx) override; virtual void GarbageCollect(GcContext& GcCtx) override; - virtual CasStoreSize TotalSize() const override; + virtual CidStoreSize TotalSize() const override; private: CasContainerStrategy m_TinyStrategy; @@ -124,7 +74,7 @@ private: void UpdateManifest(); }; -CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config, Gc), m_SmallStrategy(m_Config, Gc), m_LargeStrategy(m_Config, Gc) +CasImpl::CasImpl(GcManager& Gc) : m_TinyStrategy(Gc), m_SmallStrategy(Gc), m_LargeStrategy(Gc) { } @@ -133,7 +83,7 @@ CasImpl::~CasImpl() } void -CasImpl::Initialize(const CasStoreConfiguration& InConfig) +CasImpl::Initialize(const CidStoreConfiguration& InConfig) { m_Config = InConfig; @@ -149,9 +99,9 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) // Initialize payload storage - m_LargeStrategy.Initialize(IsNewStore); - m_TinyStrategy.Initialize("tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block - m_SmallStrategy.Initialize("sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block + m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); + m_TinyStrategy.Initialize(m_Config.RootDirectory, "tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block + m_SmallStrategy.Initialize(m_Config.RootDirectory, "sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block } bool @@ -292,7 +242,7 @@ CasImpl::ContainsChunk(const IoHash& ChunkHash) } void -CasImpl::FilterChunks(CasChunkSet& InOutChunks) +CasImpl::FilterChunks(HashKeySet& InOutChunks) { m_SmallStrategy.FilterChunks(InOutChunks); m_TinyStrategy.FilterChunks(InOutChunks); @@ -330,7 +280,7 @@ CasImpl::GarbageCollect(GcContext& GcCtx) m_LargeStrategy.CollectGarbage(GcCtx); } -CasStoreSize +CidStoreSize CasImpl::TotalSize() const { const uint64_t Tiny = m_TinyStrategy.StorageSize().DiskSize; @@ -343,7 +293,7 @@ CasImpl::TotalSize() const ////////////////////////////////////////////////////////////////////////// std::unique_ptr<CasStore> -CreateCasStore(CasGc& Gc) +CreateCasStore(GcManager& Gc) { return std::make_unique<CasImpl>(Gc); } @@ -359,10 +309,10 @@ TEST_CASE("CasStore") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration config; + CidStoreConfiguration config; config.RootDirectory = TempDir.Path(); - CasGc Gc; + GcManager Gc; std::unique_ptr<CasStore> Store = CreateCasStore(Gc); Store->Initialize(config); @@ -382,9 +332,9 @@ TEST_CASE("CasStore") CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); CHECK(Result2.New); - CasChunkSet ChunkSet; - ChunkSet.AddChunkToSet(Hash1); - ChunkSet.AddChunkToSet(Hash2); + HashKeySet ChunkSet; + ChunkSet.AddHashToSet(Hash1); + ChunkSet.AddHashToSet(Hash2); Store->FilterChunks(ChunkSet); CHECK(ChunkSet.IsEmpty()); diff --git a/zenstore/cas.h b/zenstore/cas.h new file mode 100644 index 000000000..2ad160d28 --- /dev/null +++ b/zenstore/cas.h @@ -0,0 +1,61 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/blake3.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/refcount.h> +#include <zencore/timer.h> +#include <zenstore/cidstore.h> +#include <zenstore/hashkeyset.h> + +#include <atomic> +#include <filesystem> +#include <functional> +#include <memory> +#include <string> +#include <unordered_set> + +namespace zen { + +class GcContext; +class GcManager; +class ScrubContext; + +/** Content Addressable Storage interface + + */ + +class CasStore +{ +public: + virtual ~CasStore() = default; + + const CidStoreConfiguration& Config() { return m_Config; } + + struct InsertResult + { + bool New = false; + }; + + virtual void Initialize(const CidStoreConfiguration& Config) = 0; + virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; + virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; + virtual void Flush() = 0; + virtual void Scrub(ScrubContext& Ctx) = 0; + virtual void GarbageCollect(GcContext& GcCtx) = 0; + virtual CidStoreSize TotalSize() const = 0; + +protected: + CidStoreConfiguration m_Config; + uint64_t m_LastScrubTime = 0; +}; + +ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); + +void CAS_forcelink(); + +} // namespace zen diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index 03a56f010..9c5258bce 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenstore/cas.h> +#include <zenstore/caslog.h> #include "compactcas.h" diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 01eda4697..5a079dbed 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -7,8 +7,9 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/string.h> -#include <zenstore/cas.h> -#include <zenstore/caslog.h> +#include <zenstore/scrubcontext.h> + +#include "cas.h" #include <filesystem> @@ -18,153 +19,32 @@ struct CidStore::Impl { Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} - struct IndexEntry - { - IoHash Uncompressed; - IoHash Compressed; - }; - - CasStore& m_CasStore; - TCasLogFile<IndexEntry> m_LogFile; + CasStore& m_CasStore; - RwLock m_Lock; - tsl::robin_map<IoHash, IoHash> m_CidMap; + void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); } - CidStore::InsertResult AddChunk(CompressedBuffer& ChunkData) + CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData) { const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); - IoHash CompressedHash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); Payload.SetContentType(ZenContentType::kCompressedBinary); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, CompressedHash); - AddCompressedCid(DecompressedId, CompressedHash); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId); - return {.DecompressedId = DecompressedId, .CompressedHash = CompressedHash, .New = Result.New}; + return {.New = Result.New}; } - void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) - { - ZEN_ASSERT(Compressed != IoHash::Zero); - - RwLock::ExclusiveLockScope _(m_Lock); + IoBuffer FindChunkByCid(const IoHash& DecompressedId) { return m_CasStore.FindChunk(DecompressedId); } - auto It = m_CidMap.try_emplace(DecompressedId, Compressed); - if (!It.second) - { - if (It.first.value() != Compressed) - { - It.first.value() = Compressed; - } - else - { - // No point logging an update that won't change anything - return; - } - } + bool ContainsChunk(const IoHash& DecompressedId) { return m_CasStore.ContainsChunk(DecompressedId); } - // It's not ideal to do this while holding the lock in case - // we end up in blocking I/O but that's for later - LogMapping(DecompressedId, Compressed); - } - - void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash) + void FilterChunks(HashKeySet& InOutChunks) { - ZEN_ASSERT(DecompressedId != CompressedHash); - m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash}); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return ContainsChunk(Hash); }); } - IoHash RemapCid(const IoHash& DecompressedId) - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - return It->second; - } - - return IoHash::Zero; - } - - IoBuffer FindChunkByCid(const IoHash& DecompressedId) - { - IoHash CompressedHash; - - { - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - CompressedHash = It->second; - } - else - { - return {}; - } - } - - ZEN_ASSERT(CompressedHash != IoHash::Zero); - - return m_CasStore.FindChunk(CompressedHash); - } - - bool ContainsChunk(const IoHash& DecompressedId) - { - IoHash CasHash = IoHash::Zero; - - { - RwLock::SharedLockScope _(m_Lock); - if (const auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) - { - CasHash = It->second; - } - } - - return CasHash != IoHash::Zero ? m_CasStore.ContainsChunk(CasHash) : false; - } - - void InitializeIndex(const std::filesystem::path& RootDir) - { - CreateDirectories(RootDir); - std::filesystem::path SlogPath{RootDir / "cid.slog"}; - - bool IsNew = !std::filesystem::exists(SlogPath); - - m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); - - ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize())); - - uint64_t TombstoneCount = 0; - uint64_t InvalidCount = 0; - - m_LogFile.Replay( - [&](const IndexEntry& Entry) { - if (Entry.Compressed != IoHash::Zero) - { - // Update - m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); - } - else - { - if (Entry.Uncompressed != IoHash::Zero) - { - // Tombstone - m_CidMap.erase(Entry.Uncompressed); - ++TombstoneCount; - } - else - { - // Completely uninitialized entry with both hashes set to zero indicates a - // problem. Might be an unwritten page due to BSOD or some other problem - ++InvalidCount; - } - } - }, - 0); - - ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount); - } - - void Flush() { m_LogFile.Flush(); } + void Flush() { m_CasStore.Flush(); } void Scrub(ScrubContext& Ctx) { @@ -175,83 +55,7 @@ struct CidStore::Impl m_LastScrubTime = Ctx.ScrubTimestamp(); - CasChunkSet ChunkSet; - - { - RwLock::SharedLockScope _(m_Lock); - - for (auto& Kv : m_CidMap) - { - ChunkSet.AddChunkToSet(Kv.second); - } - } - - m_CasStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - // All good - we have all the chunks - return; - } - - ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed", - ChunkSet.GetSize(), - m_CidMap.size()); - - // Erase all mappings to chunks which are not present in the underlying CAS store - // we do this by removing mappings from the in-memory lookup structure and also - // by emitting tombstone records to the commit log - - std::vector<IoHash> BadChunks; - - { - RwLock::SharedLockScope _(m_Lock); - - for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) - { - if (ChunkSet.ContainsChunk(It->second)) - { - const IoHash& BadHash = It->first; - - // Log a tombstone record - LogMapping(BadHash, IoHash::Zero); - - BadChunks.push_back(BadHash); - - It = m_CidMap.erase(It); - } - else - { - ++It; - } - } - } - - m_LogFile.Flush(); - - // TODO: Should compute a snapshot index here - - Ctx.ReportBadCasChunks(BadChunks); - } - - void RemoveCids(CasChunkSet& CasChunks) - { - std::vector<IndexEntry> RemovedEntries; - RemovedEntries.reserve(CasChunks.GetSize()); - { - RwLock::ExclusiveLockScope _(m_Lock); - for (auto It = m_CidMap.begin(), End = m_CidMap.end(); It != End;) - { - if (CasChunks.ContainsChunk(It->second)) - { - RemovedEntries.push_back({It->first, IoHash::Zero}); - It = m_CidMap.erase(It); - continue; - } - ++It; - } - } - m_LogFile.Append(RemovedEntries); + m_CasStore.Scrub(Ctx); } uint64_t m_LastScrubTime = 0; @@ -259,25 +63,24 @@ struct CidStore::Impl ////////////////////////////////////////////////////////////////////////// -CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore)) +CidStore::CidStore(GcManager& Gc) : m_CasStore(CreateCasStore(Gc)), m_Impl(std::make_unique<Impl>(*m_CasStore)) { - m_Impl->InitializeIndex(RootDir); } CidStore::~CidStore() { } -CidStore::InsertResult -CidStore::AddChunk(CompressedBuffer& ChunkData) +void +CidStore::Initialize(const CidStoreConfiguration& Config) { - return m_Impl->AddChunk(ChunkData); + m_Impl->Initialize(Config); } -void -CidStore::AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) +CidStore::InsertResult +CidStore::AddChunk(const CompressedBuffer& ChunkData) { - m_Impl->AddCompressedCid(DecompressedId, Compressed); + return m_Impl->AddChunk(ChunkData); } IoBuffer @@ -286,12 +89,6 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId) return m_Impl->FindChunkByCid(DecompressedId); } -IoHash -CidStore::RemapCid(const IoHash& DecompressedId) -{ - return m_Impl->RemapCid(DecompressedId); -} - bool CidStore::ContainsChunk(const IoHash& DecompressedId) { @@ -299,25 +96,25 @@ CidStore::ContainsChunk(const IoHash& DecompressedId) } void -CidStore::Flush() +CidStore::FilterChunks(HashKeySet& InOutChunks) { - m_Impl->Flush(); + return m_Impl->FilterChunks(InOutChunks); } void -CidStore::Scrub(ScrubContext& Ctx) +CidStore::Flush() { - m_Impl->Scrub(Ctx); + m_Impl->Flush(); } void -CidStore::RemoveCids(CasChunkSet& CasChunks) +CidStore::Scrub(ScrubContext& Ctx) { - m_Impl->RemoveCids(CasChunks); + m_Impl->Scrub(Ctx); } -CasStoreSize -CidStore::CasSize() const +CidStoreSize +CidStore::TotalSize() const { return m_Impl->m_CasStore.TotalSize(); } diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 5aed02e7f..a7fdfa1f5 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -2,13 +2,16 @@ #include "compactcas.h" -#include <zenstore/cas.h> +#include "cas.h" +#include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zenstore/scrubcontext.h> + #include <gsl/gsl-lite.hpp> #include <xxhash.h> @@ -76,94 +79,6 @@ namespace { return GetBasePath(RootPath, ContainerBaseName) / "blocks"; } - std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / (ContainerBaseName + LogExtension); - } - - std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / (ContainerBaseName + ".ucas"); - } - - std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) - { - return RootPath / (ContainerBaseName + IndexExtension); - } - - struct LegacyCasDiskLocation - { - LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize) - { - ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); - ZEN_ASSERT(InSize <= 0xff'ffff'ffff); - - memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); - memcpy(&m_Size[0], &InSize, sizeof m_Size); - } - - LegacyCasDiskLocation() = default; - - inline uint64_t GetOffset() const - { - uint64_t Offset = 0; - memcpy(&Offset, &m_Offset, sizeof m_Offset); - return Offset; - } - - inline uint64_t GetSize() const - { - uint64_t Size = 0; - memcpy(&Size, &m_Size, sizeof m_Size); - return Size; - } - - private: - uint8_t m_Offset[5]; - uint8_t m_Size[5]; - }; - - struct LegacyCasDiskIndexEntry - { - static const uint8_t kTombstone = 0x01; - - IoHash Key; - LegacyCasDiskLocation Location; - ZenContentType ContentType = ZenContentType::kUnknownContentType; - uint8_t Flags = 0; - }; - - bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason) - { - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); - return false; - } - if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone) - { - return true; - } - if (Entry.ContentType != ZenContentType::kUnknownContentType) - { - OutReason = - fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); - return false; - } - uint64_t Size = Entry.Location.GetSize(); - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; - } - bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -199,10 +114,7 @@ namespace { ////////////////////////////////////////////////////////////////////////// -CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) -: GcStorage(Gc) -, m_Config(Config) -, m_Log(logging::Get("containercas")) +CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("containercas")) { } @@ -211,16 +123,21 @@ CasContainerStrategy::~CasContainerStrategy() } void -CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) +CasContainerStrategy::Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint64_t Alignment, + bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); ZEN_ASSERT(MaxBlockSize > 0); + m_RootDirectory = RootDirectory; m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; m_MaxBlockSize = MaxBlockSize; - m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName); + m_BlocksBasePath = GetBlocksBasePath(m_RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); @@ -267,6 +184,9 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const CasStore::InsertResult CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } @@ -293,7 +213,7 @@ CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) } void -CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) +CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) { // This implementation is good enough for relatively small // chunk sets (in terms of chunk identifiers), but would @@ -302,7 +222,7 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) // we're likely to already have a large proportion of the // chunks in the set - InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -316,6 +236,7 @@ void CasContainerStrategy::Scrub(ScrubContext& Ctx) { std::vector<IoHash> BadKeys; + uint64_t ChunkCount{0}, ChunkBytes{0}; std::vector<BlockStoreLocation> ChunkLocations; std::vector<IoHash> ChunkIndexToChunkHash; @@ -337,6 +258,9 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; if (!Data) { @@ -344,66 +268,97 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) BadKeys.push_back(Hash); return; } - const IoHash ComputedHash = IoHash::HashBuffer(Data, Size); - if (ComputedHash != Hash) + + IoBuffer Buffer(IoBuffer::Wrap, Data, Size); + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) + { + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHash ComputedHash = IoHash::HashBuffer(Data, Size); + if (ComputedHash == Hash) { - // Hash mismatch - BadKeys.push_back(Hash); return; } +#endif + BadKeys.push_back(Hash); }; const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ++ChunkCount; + ChunkBytes += Size; + + const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; + IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size); + // TODO: Add API to verify compressed buffer without having to memorymap the whole file + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) + { + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + // Hash mismatch + BadKeys.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS IoHashStream Hasher; - File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); - IoHash ComputedHash = Hasher.GetHash(); - const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex]; - if (ComputedHash != Hash) + File.StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + if (ComputedHash == Hash) { - // Hash mismatch - BadKeys.push_back(Hash); return; } +#endif + BadKeys.push_back(Hash); }; m_BlockStore.IterateChunks(ChunkLocations, ValidateSmallChunk, ValidateLargeChunk); _.ReleaseNow(); - if (BadKeys.empty()) - { - return; - } - - ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_Config.RootDirectory / m_ContainerBaseName); + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - if (Ctx.RunRecovery()) + if (!BadKeys.empty()) { - // Deal with bad chunks by removing them from our lookup map + ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(BadKeys.size()); + if (Ctx.RunRecovery()) { - RwLock::ExclusiveLockScope __(m_LocationMapLock); - for (const IoHash& ChunkHash : BadKeys) + // Deal with bad chunks by removing them from our lookup map + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BadKeys.size()); { - const auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt == m_LocationMap.end()) + RwLock::ExclusiveLockScope __(m_LocationMapLock); + for (const IoHash& ChunkHash : BadKeys) { - // Might have been GC'd - continue; + const auto KeyIt = m_LocationMap.find(ChunkHash); + if (KeyIt == m_LocationMap.end()) + { + // Might have been GC'd + continue; + } + LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); + m_LocationMap.erase(KeyIt); } - LogEntries.push_back({.Key = KeyIt->first, .Location = KeyIt->second, .Flags = CasDiskIndexEntry::kTombstone}); - m_LocationMap.erase(KeyIt); } + m_CasLog.Append(LogEntries); } - m_CasLog.Append(LogEntries); } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadCasChunks(BadKeys); + Ctx.ReportBadCidChunks(BadKeys); + + ZEN_INFO("compact cas scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); } void @@ -432,7 +387,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) // do a blocking operation and update the m_LocationMap after each new block is // written and figuring out the path to the next new block. - ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t WriteBlockTimeUs = 0; uint64_t WriteBlockLongestTimeUs = 0; @@ -468,7 +423,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) ChunkLocations.reserve(TotalChunkCount); ChunkIndexToChunkHash.reserve(TotalChunkCount); - GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { auto KeyIt = LocationMap.find(ChunkHash); const BlockStoreDiskLocation& DiskLocation = KeyIt->second; BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment); @@ -539,26 +494,26 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) }, [&GcCtx]() { return GcCtx.CollectSmallObjects(); }); - GcCtx.DeletedCas(DeletedChunks); + GcCtx.AddDeletedCids(DeletedChunks); } void CasContainerStrategy::MakeIndexSnapshot() { - ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName); + ZEN_INFO("write store snapshot for '{}'", m_RootDirectory / m_ContainerBaseName); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, + m_RootDirectory / m_ContainerBaseName, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); + fs::path TempIndexPath = GetTempIndexPath(m_RootDirectory, m_ContainerBaseName); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(TempIndexPath)) @@ -629,13 +584,13 @@ uint64_t CasContainerStrategy::ReadIndexFile() { std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, + m_RootDirectory / m_ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); @@ -682,13 +637,13 @@ uint64_t CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) { std::vector<CasDiskIndexEntry> Entries; - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); if (std::filesystem::is_regular_file(LogPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, + m_RootDirectory / m_ContainerBaseName, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); @@ -727,208 +682,6 @@ CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) return 0; } -uint64_t -CasContainerStrategy::MigrateLegacyData(bool CleanSource) -{ - std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); - - if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) - { - return 0; - } - - ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName); - - std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); - std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - - uint64_t MigratedChunkCount = 0; - uint32_t MigratedBlockCount = 0; - Stopwatch MigrationTimer; - uint64_t TotalSize = 0; - const auto _ = MakeGuard([&] { - ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", - m_Config.RootDirectory / m_ContainerBaseName, - MigratedChunkCount, - MigratedBlockCount, - NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), - NiceBytes(TotalSize)); - }); - - uint64_t BlockFileSize = 0; - { - BasicFile BlockFile; - BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - BlockFileSize = BlockFile.FileSize(); - } - - std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; - uint64_t InvalidEntryCount = 0; - - TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; - LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); - { - Stopwatch Timer; - const auto __ = MakeGuard([&] { - ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", - m_Config.RootDirectory / m_ContainerBaseName, - LegacyDiskIndex.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - if (LegacyCasLog.Initialize()) - { - LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); - LegacyCasLog.Replay( - [&](const LegacyCasDiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) - { - LegacyDiskIndex.erase(Record.Key); - return; - } - if (!ValidateLegacyEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); - InvalidEntryCount++; - return; - } - LegacyDiskIndex.insert_or_assign(Record.Key, Record); - }, - 0); - - std::vector<IoHash> BadEntries; - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskIndexEntry& Record(Entry.second); - if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize) - { - continue; - } - ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); - BadEntries.push_back(Entry.first); - } - for (const IoHash& BadHash : BadEntries) - { - LegacyDiskIndex.erase(BadHash); - } - InvalidEntryCount += BadEntries.size(); - } - } - - if (InvalidEntryCount) - { - ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName); - } - - if (LegacyDiskIndex.empty()) - { - LegacyCasLog.Close(); - if (CleanSource) - { - // Older versions of CasContainerStrategy expects the legacy files to exist if it can find - // a CAS manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); - BasicFile LegacySidx; - LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); - } - return 0; - } - - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); - CreateDirectories(LogPath.parent_path()); - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - - std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash; - std::vector<BlockStoreLocation> ChunkLocations; - ChunkIndexToChunkHash.reserve(LegacyDiskIndex.size()); - ChunkLocations.reserve(LegacyDiskIndex.size()); - for (const auto& Entry : LegacyDiskIndex) - { - const LegacyCasDiskLocation& Location = Entry.second.Location; - const IoHash& ChunkHash = Entry.first; - size_t ChunkIndex = ChunkLocations.size(); - ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.GetOffset(), .Size = Location.GetSize()}); - ChunkIndexToChunkHash[ChunkIndex] = ChunkHash; - TotalSize += Location.GetSize(); - } - m_BlockStore.Split( - ChunkLocations, - LegacyDataPath, - m_BlocksBasePath, - m_MaxBlockSize, - BlockStoreDiskLocation::MaxBlockIndex + 1, - m_PayloadAlignment, - CleanSource, - [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount]( - const BlockStore::MovedChunksArray& MovedChunks) { - std::vector<CasDiskIndexEntry> LogEntries; - LogEntries.reserve(MovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const BlockStoreLocation& NewLocation = Entry.second; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; - LogEntries.push_back({.Key = ChunkHash, - .Location = {NewLocation, m_PayloadAlignment}, - .ContentType = OldEntry.ContentType, - .Flags = OldEntry.Flags}); - } - for (const CasDiskIndexEntry& Entry : LogEntries) - { - m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); - } - CasLog.Append(LogEntries); - CasLog.Flush(); - if (CleanSource) - { - std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; - LegacyLogEntries.reserve(MovedChunks.size()); - for (const auto& Entry : MovedChunks) - { - size_t ChunkIndex = Entry.first; - const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex]; - const LegacyCasDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash]; - LegacyLogEntries.push_back( - LegacyCasDiskIndexEntry{.Key = ChunkHash, - .Location = OldEntry.Location, - .ContentType = OldEntry.ContentType, - .Flags = (uint8_t)(OldEntry.Flags | LegacyCasDiskIndexEntry::kTombstone)}); - } - LegacyCasLog.Append(LegacyLogEntries); - LegacyCasLog.Flush(); - } - MigratedBlockCount++; - MigratedChunkCount += MovedChunks.size(); - }); - - LegacyCasLog.Close(); - CasLog.Close(); - - if (CleanSource) - { - // Older versions of CasContainerStrategy expects the legacy files to exist if it can find - // a CAS manifest and crashes on startup if they don't. - // In order to not break startup when switching back an older version, lets just reset - // the legacy data files to zero length. - - BasicFile LegacyLog; - LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); - BasicFile LegacySobs; - LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); - BasicFile LegacySidx; - LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); - } - return MigratedChunkCount; -} - void CasContainerStrategy::OpenContainer(bool IsNewStore) { @@ -937,25 +690,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_LocationMap.clear(); - std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path BasePath = GetBasePath(m_RootDirectory, m_ContainerBaseName); if (IsNewStore) { - std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); - std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); - - std::filesystem::remove(LegacyLogPath); - std::filesystem::remove(LegacyDataPath); std::filesystem::remove_all(BasePath); } - uint64_t LogPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(LogPosition); - uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); CreateDirectories(BasePath); - std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LogPath = GetLogPath(m_RootDirectory, m_ContainerBaseName); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); std::vector<BlockStoreLocation> KnownLocations; @@ -969,7 +716,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_BlockStore.Initialize(m_BlocksBasePath, m_MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations); - if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0)) + if (IsNewStore || (LogEntryCount > 0)) { MakeIndexSnapshot(); } @@ -1040,18 +787,14 @@ TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - const int kIterationCount = 1000; std::vector<IoHash> Keys(kIterationCount); { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { @@ -1083,9 +826,9 @@ TEST_CASE("compactcas.compact.gc") // the original cas store { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { @@ -1109,18 +852,13 @@ TEST_CASE("compactcas.compact.totalsize") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - - CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1024; const int32_t kChunkCount = 16; { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, true); for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) { @@ -1135,9 +873,9 @@ TEST_CASE("compactcas.compact.totalsize") } { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); @@ -1145,9 +883,9 @@ TEST_CASE("compactcas.compact.totalsize") // Re-open again, this time we should have a snapshot { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 65536, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 65536, 16, false); const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); @@ -1159,13 +897,9 @@ TEST_CASE("compactcas.gc.basic") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 65536, 1 << 4, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); @@ -1186,16 +920,12 @@ TEST_CASE("compactcas.gc.removefile") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - IoBuffer Chunk = CreateChunk(128); IoHash ChunkHash = IoHash::HashBuffer(Chunk); { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 65536, 1 << 4, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, true); const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); CHECK(InsertResult.New); @@ -1204,9 +934,9 @@ TEST_CASE("compactcas.gc.removefile") Cas.Flush(); } - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 65536, 1 << 4, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 65536, 1 << 4, false); GcContext GcCtx; GcCtx.CollectSmallObjects(true); @@ -1222,13 +952,9 @@ TEST_CASE("compactcas.gc.compact") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("cb", 2048, 1 << 4, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "cb", 2048, 1 << 4, true); uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; std::vector<IoBuffer> Chunks; @@ -1275,7 +1001,7 @@ TEST_CASE("compactcas.gc.compact") std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1308,7 +1034,7 @@ TEST_CASE("compactcas.gc.compact") GcCtx.CollectSmallObjects(true); std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1342,7 +1068,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1377,7 +1103,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1414,7 +1140,7 @@ TEST_CASE("compactcas.gc.compact") KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1476,13 +1202,10 @@ TEST_CASE("compactcas.gc.deleteblockonopen") ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 1024, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { @@ -1498,7 +1221,7 @@ TEST_CASE("compactcas.gc.deleteblockonopen") { KeepChunks.push_back(ChunkHashes[i]); } - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); Cas.Flush(); Cas.CollectGarbage(GcCtx); @@ -1513,9 +1236,9 @@ TEST_CASE("compactcas.gc.deleteblockonopen") } { // Re-open - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 1024, 16, false); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, false); for (size_t i = 0; i < 20; i += 2) { @@ -1545,13 +1268,9 @@ TEST_CASE("compactcas.gc.handleopeniobuffer") ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); } - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 1024, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 1024, 16, true); for (size_t i = 0; i < 20; i++) { @@ -1574,131 +1293,12 @@ TEST_CASE("compactcas.gc.handleopeniobuffer") CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); } -TEST_CASE("compactcas.legacyconversion") -{ - ScopedTemporaryDirectory TempDir; - - uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048}; - size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); - size_t SingleBlockSize = 0; - std::vector<IoBuffer> Chunks; - Chunks.reserve(ChunkCount); - for (uint64_t Size : ChunkSizes) - { - Chunks.push_back(CreateChunk(Size)); - SingleBlockSize += Size; - } - - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(ChunkCount); - for (const IoBuffer& Chunk : Chunks) - { - ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); - } - - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); - - { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true); - - for (size_t i = 0; i < ChunkCount; i++) - { - CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); - } - - std::vector<IoHash> KeepChunks; - for (size_t i = 0; i < ChunkCount; i += 2) - { - KeepChunks.push_back(ChunkHashes[i]); - } - GcContext GcCtx; - GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepChunks); - Cas.Flush(); - Gc.CollectGarbage(GcCtx); - } - - std::filesystem::path BlockPath = BlockStore::GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); - std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test"); - std::filesystem::rename(BlockPath, LegacyDataPath); - - std::vector<CasDiskIndexEntry> LogEntries; - std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test"); - if (std::filesystem::is_regular_file(IndexPath)) - { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CasDiskIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); - CasDiskIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && - Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) - { - LogEntries.resize(Header.EntryCount); - ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); - } - } - ObjectIndexFile.Close(); - std::filesystem::remove(IndexPath); - } - - std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test"); - { - TCasLogFile<CasDiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - LogEntries.reserve(CasLog.GetLogCount()); - CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); - } - TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; - std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test"); - LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); - - for (const CasDiskIndexEntry& Entry : LogEntries) - { - BlockStoreLocation Location = Entry.Location.Get(16); - LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size); - LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key, - .Location = LegacyLocation, - .ContentType = Entry.ContentType, - .Flags = Entry.Flags}; - LegacyCasLog.Append(LegacyEntry); - } - LegacyCasLog.Close(); - - std::filesystem::remove_all(CasConfig.RootDirectory / "test"); - - { - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 2048, 16, false); - - for (size_t i = 0; i < ChunkCount; i += 2) - { - CHECK(Cas.HaveChunk(ChunkHashes[i])); - CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); - CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); - } - } -} - TEST_CASE("compactcas.threadedinsert") { // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path(); - - CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1048; const int32_t kChunkCount = 4096; uint64_t ExpectedSize = 0; @@ -1724,9 +1324,9 @@ TEST_CASE("compactcas.threadedinsert") std::atomic<size_t> WorkCompleted = 0; WorkerThreadPool ThreadPool(4); - CasGc Gc; - CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 32768, 16, true); + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(TempDir.Path(), "test", 32768, 16, true); { for (const auto& Chunk : Chunks) { @@ -1838,10 +1438,10 @@ TEST_CASE("compactcas.threadedinsert") GcContext GcCtx; GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); + GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } while (WorkCompleted < NewChunks.size() + Chunks.size()) @@ -1879,10 +1479,10 @@ TEST_CASE("compactcas.threadedinsert") GcContext GcCtx; GcCtx.CollectSmallObjects(true); - GcCtx.ContributeCas(KeepHashes); + GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); - CasChunkSet& Deleted = GcCtx.DeletedCas(); - Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + const HashKeySet& Deleted = GcCtx.DeletedCids(); + Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } { WorkCompleted = 0; @@ -1902,53 +1502,6 @@ TEST_CASE("compactcas.threadedinsert") } } -TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true)) -{ - if (true) - { - return; - } - const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas"; - std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs"); - std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); - std::filesystem::remove_all(TobsBasePath); - std::filesystem::remove_all(SobsBasePath); - - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = BigDataPath; - uint64_t TObsSize = 0; - { - CasGc TobsCasGc; - CasContainerStrategy TobsCas(CasConfig, TobsCasGc); - TobsCas.Initialize("tobs", 1u << 28, 16, false); - TObsSize = TobsCas.StorageSize().DiskSize; - CHECK(TObsSize > 0); - } - - uint64_t SObsSize = 0; - { - CasGc SobsCasGc; - CasContainerStrategy SobsCas(CasConfig, SobsCasGc); - SobsCas.Initialize("sobs", 1u << 30, 4096, false); - SObsSize = SobsCas.StorageSize().DiskSize; - CHECK(SObsSize > 0); - } - - CasGc TobsCasGc; - CasContainerStrategy TobsCas(CasConfig, TobsCasGc); - TobsCas.Initialize("tobs", 1u << 28, 16, false); - GcContext TobsGcCtx; - TobsCas.CollectGarbage(TobsGcCtx); - CHECK(TobsCas.StorageSize().DiskSize == TObsSize); - - CasGc SobsCasGc; - CasContainerStrategy SobsCas(CasConfig, SobsCasGc); - SobsCas.Initialize("sobs", 1u << 30, 4096, false); - GcContext SobsGcCtx; - SobsCas.CollectGarbage(SobsGcCtx); - CHECK(SobsCas.StorageSize().DiskSize == SObsSize); -} - #endif void diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 114a6a48c..2acac7ca3 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -4,10 +4,11 @@ #include <zencore/zencore.h> #include <zenstore/blockstore.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> +#include "cas.h" + #include <atomic> #include <limits> #include <unordered_map> @@ -47,14 +48,18 @@ static_assert(sizeof(CasDiskIndexEntry) == 32); struct CasContainerStrategy final : public GcStorage { - CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc); + CasContainerStrategy(GcManager& Gc); ~CasContainerStrategy(); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(CasChunkSet& InOutChunks); - void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore); + void FilterChunks(HashKeySet& InOutChunks); + void Initialize(const std::filesystem::path& RootDirectory, + const std::string_view ContainerBaseName, + uint32_t MaxBlockSize, + uint64_t Alignment, + bool IsNewStore); void Flush(); void Scrub(ScrubContext& Ctx); virtual void CollectGarbage(GcContext& GcCtx) override; @@ -65,12 +70,11 @@ private: void MakeIndexSnapshot(); uint64_t ReadIndexFile(); uint64_t ReadLog(uint64_t SkipEntryCount); - uint64_t MigrateLegacyData(bool CleanSource); void OpenContainer(bool IsNewStore); spdlog::logger& Log() { return m_Log; } - const CasStoreConfiguration& m_Config; + std::filesystem::path m_RootDirectory; spdlog::logger& m_Log; uint64_t m_PayloadAlignment = 1u << 4; uint64_t m_MaxBlockSize = 1u << 28; diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index d074a906f..23e3f4cd8 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -2,6 +2,7 @@ #include "filecas.h" +#include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -16,6 +17,7 @@ #include <zencore/uid.h> #include <zenstore/basicfile.h> #include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> @@ -71,10 +73,7 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo ////////////////////////////////////////////////////////////////////////// -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc) -: GcStorage(Gc) -, m_Config(Config) -, m_Log(logging::Get("filecas")) +FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas")) { } @@ -83,17 +82,19 @@ FileCasStrategy::~FileCasStrategy() } void -FileCasStrategy::Initialize(bool IsNewStore) +FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) { m_IsInitialized = true; - CreateDirectories(m_Config.RootDirectory); + m_RootDirectory = RootDirectory; - m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); + CreateDirectories(m_RootDirectory); + + m_CasLog.Open(m_RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); Stopwatch Timer; const auto _ = MakeGuard([&] { - ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); + ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); }); std::unordered_set<IoHash> FoundEntries; @@ -127,13 +128,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::ExclusiveLockScope _(LockForHash(ChunkHash)); @@ -340,7 +345,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); // See if file already exists // @@ -485,7 +490,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -497,7 +502,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); @@ -513,7 +518,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash) void FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { - ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); if (Ec) @@ -534,7 +539,7 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) } void -FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) +FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) { ZEN_ASSERT(m_IsInitialized); @@ -546,7 +551,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) // a caller, this is something which needs to be taken into account by anyone consuming // this functionality in any case - InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -602,12 +607,12 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& const std::filesystem::path& RootDirectory; std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback; - } CasVisitor{m_Config.RootDirectory}; + } CasVisitor{m_RootDirectory}; CasVisitor.Callback = std::move(Callback); FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor); + Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); } void @@ -630,21 +635,34 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) { ZEN_ASSERT(m_IsInitialized); - std::vector<IoHash> BadHashes; - std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + std::vector<IoHash> BadHashes; + uint64_t ChunkCount{0}, ChunkBytes{0}; IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + ++ChunkCount; + ChunkBytes += Payload.FileSize(); + + IoBuffer Buffer(IoBuffer::BorrowedFile, Payload.Handle(), 0, Payload.FileSize()); + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); Compressed) + { + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + // Hash mismatch + BadHashes.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS IoHashStream Hasher; - Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + Payload.StreamByteRange(0, Payload.FileSize(), [&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); IoHash ComputedHash = Hasher.GetHash(); - - if (ComputedHash != Hash) + if (ComputedHash == Hash) { - BadHashes.push_back(Hash); + return; } - - ++ChunkCount; - ChunkBytes.fetch_add(Payload.FileSize()); +#endif + BadHashes.push_back(Hash); }); Ctx.ReportScrubbed(ChunkCount, ChunkBytes); @@ -670,9 +688,12 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } } - Ctx.ReportBadCasChunks(BadHashes); + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadHashes); - ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); } void @@ -680,7 +701,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) { ZEN_ASSERT(m_IsInitialized); - ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory); + ZEN_INFO("collecting garbage from {}", m_RootDirectory); std::vector<IoHash> ChunksToDelete; std::atomic<uint64_t> ChunksToDeleteBytes{0}; @@ -694,7 +715,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", - m_Config.RootDirectory, + m_RootDirectory, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), DeletedCount, ChunkCount, @@ -706,7 +727,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) bool KeepThis = false; CandidateCas.clear(); CandidateCas.push_back(Hash); - GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) { + GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { ZEN_UNUSED(Hash); KeepThis = true; }); @@ -725,12 +746,12 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) if (ChunksToDelete.empty()) { - ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory); + ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); return; } ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})", - m_Config.RootDirectory, + m_RootDirectory, ChunksToDelete.size(), ChunkCount.load(), NiceBytes(ChunksToDeleteBytes)); @@ -751,13 +772,13 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) if (Ec) { - ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message()); + ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); continue; } DeletedCount++; } - GcCtx.DeletedCas(ChunksToDelete); + GcCtx.AddDeletedCids(ChunksToDelete); } ////////////////////////////////////////////////////////////////////////// @@ -769,13 +790,10 @@ TEST_CASE("cas.file.move") // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - CasGc Gc; + GcManager Gc; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - FileCasStrategy FileCas(CasConfig, Gc); - FileCas.Initialize(/* IsNewStore */ true); + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; @@ -850,12 +868,9 @@ TEST_CASE("cas.file.gc") // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - CasStoreConfiguration CasConfig; - CasConfig.RootDirectory = TempDir.Path() / "cas"; - - CasGc Gc; - FileCasStrategy FileCas(CasConfig, Gc); - FileCas.Initialize(/* IsNewStore */ true); + GcManager Gc; + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); const int kIterationCount = 1000; std::vector<IoHash> Keys{kIterationCount}; @@ -903,7 +918,7 @@ TEST_CASE("cas.file.gc") { if (Key.Hash[0] & 1) { - Ctx.ContributeCas(std::vector<IoHash>{Key}); + Ctx.AddRetainedCids(std::vector<IoHash>{Key}); } } diff --git a/zenstore/filecas.h b/zenstore/filecas.h index ef67ae9eb..f14e5d057 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -8,10 +8,11 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/thread.h> -#include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> +#include "cas.h" + #include <atomic> #include <functional> @@ -28,28 +29,28 @@ class BasicFile; struct FileCasStrategy final : public GcStorage { - FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc); + FileCasStrategy(GcManager& Gc); ~FileCasStrategy(); - void Initialize(bool IsNewStore); + void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore); CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); - void FilterChunks(CasChunkSet& InOutChunks); + void FilterChunks(HashKeySet& InOutChunks); void Flush(); void Scrub(ScrubContext& Ctx); virtual void CollectGarbage(GcContext& GcCtx) override; virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } private: - const CasStoreConfiguration& m_Config; - RwLock m_Lock; - RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines - spdlog::logger& m_Log; - spdlog::logger& Log() { return m_Log; } - std::atomic_uint64_t m_TotalSize{}; - bool m_IsInitialized = false; + std::filesystem::path m_RootDirectory; + RwLock m_Lock; + RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + std::atomic_uint64_t m_TotalSize{}; + bool m_IsInitialized = false; struct FileCasIndexEntry { diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index bb03b9751..0902abf4a 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -14,9 +14,10 @@ #include <zencore/testing.h> #include <zencore/testutils.h> #include <zencore/timer.h> -#include <zenstore/cas.h> #include <zenstore/cidstore.h> +#include "cas.h" + #include <fmt/format.h> #include <filesystem> @@ -173,9 +174,8 @@ struct GcContext::GcState using CacheKeyContexts = std::unordered_map<std::string, std::vector<IoHash>>; CacheKeyContexts m_ExpiredCacheKeys; - CasChunkSet m_CasChunks; - CasChunkSet m_DeletedCasChunks; - CasChunkSet m_CidChunks; + HashKeySet m_RetainedCids; + HashKeySet m_DeletedCids; GcClock::TimePoint m_GcTime; GcClock::Duration m_MaxCacheDuration = std::chrono::hours(24); bool m_DeletionMode = true; @@ -194,19 +194,13 @@ GcContext::~GcContext() } void -GcContext::ContributeCids(std::span<const IoHash> Cids) -{ - m_State->m_CidChunks.AddChunksToSet(Cids); -} - -void -GcContext::ContributeCas(std::span<const IoHash> Cas) +GcContext::AddRetainedCids(std::span<const IoHash> Cids) { - m_State->m_CasChunks.AddChunksToSet(Cas); + m_State->m_RetainedCids.AddHashesToSet(Cids); } void -GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys) +GcContext::SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys) { m_State->m_ExpiredCacheKeys[CacheKeyContext] = std::move(ExpiredKeys); } @@ -214,37 +208,31 @@ GcContext::ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<I void GcContext::IterateCids(std::function<void(const IoHash&)> Callback) { - m_State->m_CidChunks.IterateChunks([&](const IoHash& Hash) { Callback(Hash); }); + m_State->m_RetainedCids.IterateHashes([&](const IoHash& Hash) { Callback(Hash); }); } void GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc) { - m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); + m_State->m_RetainedCids.FilterHashes(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); }); } void -GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc) +GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc) { - m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); }); + m_State->m_RetainedCids.FilterHashes(Cid, std::move(FilterFunc)); } void -GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc) +GcContext::AddDeletedCids(std::span<const IoHash> Cas) { - m_State->m_CasChunks.FilterChunks(Cas, std::move(FilterFunc)); + m_State->m_DeletedCids.AddHashesToSet(Cas); } -void -GcContext::DeletedCas(std::span<const IoHash> Cas) +const HashKeySet& +GcContext::DeletedCids() { - m_State->m_DeletedCasChunks.AddChunksToSet(Cas); -} - -CasChunkSet& -GcContext::DeletedCas() -{ - return m_State->m_DeletedCasChunks; + return m_State->m_DeletedCids; } std::span<const IoHash> @@ -318,7 +306,7 @@ GcContext::ClaimGCReserve() ////////////////////////////////////////////////////////////////////////// -GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc) +GcContributor::GcContributor(GcManager& Gc) : m_Gc(Gc) { m_Gc.AddGcContributor(this); } @@ -330,7 +318,7 @@ GcContributor::~GcContributor() ////////////////////////////////////////////////////////////////////////// -GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc) +GcStorage::GcStorage(GcManager& Gc) : m_Gc(Gc) { m_Gc.AddGcStorage(this); } @@ -342,30 +330,30 @@ GcStorage::~GcStorage() ////////////////////////////////////////////////////////////////////////// -CasGc::CasGc() +GcManager::GcManager() { } -CasGc::~CasGc() +GcManager::~GcManager() { } void -CasGc::AddGcContributor(GcContributor* Contributor) +GcManager::AddGcContributor(GcContributor* Contributor) { RwLock::ExclusiveLockScope _(m_Lock); m_GcContribs.push_back(Contributor); } void -CasGc::RemoveGcContributor(GcContributor* Contributor) +GcManager::RemoveGcContributor(GcContributor* Contributor) { RwLock::ExclusiveLockScope _(m_Lock); std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; }); } void -CasGc::AddGcStorage(GcStorage* Storage) +GcManager::AddGcStorage(GcStorage* Storage) { ZEN_ASSERT(Storage != nullptr); RwLock::ExclusiveLockScope _(m_Lock); @@ -373,14 +361,14 @@ CasGc::AddGcStorage(GcStorage* Storage) } void -CasGc::RemoveGcStorage(GcStorage* Storage) +GcManager::RemoveGcStorage(GcStorage* Storage) { RwLock::ExclusiveLockScope _(m_Lock); std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; }); } void -CasGc::CollectGarbage(GcContext& GcCtx) +GcManager::CollectGarbage(GcContext& GcCtx) { RwLock::SharedLockScope _(m_Lock); @@ -394,36 +382,6 @@ CasGc::CollectGarbage(GcContext& GcCtx) } } - // Cache records reference CAS chunks with the uncompressed - // raw hash (Cid). Map the content ID to CAS hash to enable - // the CAS storage backends to filter valid chunks. - - if (CidStore* CidStore = m_CidStore) - { - std::vector<IoHash> CasHashes; - uint64_t UnknownChunks = 0; - - GcCtx.IterateCids([&](const IoHash& Cid) { - IoHash Cas = CidStore->RemapCid(Cid); - - if (Cas == IoHash::Zero) - { - ++UnknownChunks; - } - else - { - CasHashes.push_back(Cas); - } - }); - - if (UnknownChunks) - { - ZEN_WARN("found {} unknown CIDs", UnknownChunks); - } - - GcCtx.ContributeCas(CasHashes); - } - // Then trim storage { @@ -434,61 +392,48 @@ CasGc::CollectGarbage(GcContext& GcCtx) Storage->CollectGarbage(GcCtx); } } +} + +GcStorageSize +GcManager::TotalStorageSize() const +{ + RwLock::SharedLockScope _(m_Lock); - // Remove Cid to CAS hash mappings. Scrub? + GcStorageSize TotalSize; - if (CidStore* CidStore = m_CidStore) + for (GcStorage* Storage : m_GcStorage) { - Stopwatch Timer; - const auto Guard = MakeGuard([&] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - CidStore->RemoveCids(GcCtx.DeletedCas()); + const auto Size = Storage->StorageSize(); + TotalSize.DiskSize += Size.DiskSize; + TotalSize.MemorySize += Size.MemorySize; } -} -void -CasGc::SetCidStore(CidStore* Cids) -{ - m_CidStore = Cids; + return TotalSize; } +#if ZEN_USE_REF_TRACKING void -CasGc::OnNewCidReferences(std::span<IoHash> Hashes) +GcManager::OnNewCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } void -CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes) +GcManager::OnCommittedCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } void -CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes) +GcManager::OnDroppedCidReferences(std::span<IoHash> Hashes) { ZEN_UNUSED(Hashes); } - -GcStorageSize -CasGc::TotalStorageSize() const -{ - RwLock::SharedLockScope _(m_Lock); - - GcStorageSize TotalSize; - - for (GcStorage* Storage : m_GcStorage) - { - const auto Size = Storage->StorageSize(); - TotalSize.DiskSize += Size.DiskSize; - TotalSize.MemorySize += Size.MemorySize; - } - - return TotalSize; -} +#endif ////////////////////////////////////////////////////////////////////////// -GcScheduler::GcScheduler(CasGc& CasGc) : m_Log(logging::Get("gc")), m_CasGc(CasGc) +GcScheduler::GcScheduler(GcManager& GcManager) : m_Log(logging::Get("gc")), m_GcManager(GcManager) { } @@ -606,7 +551,7 @@ GcScheduler::SchedulerThread() { std::error_code Ec; DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Ec); - GcStorageSize TotalSize = m_CasGc.TotalStorageSize(); + GcStorageSize TotalSize = m_GcManager.TotalStorageSize(); std::chrono::seconds RemaingTime = std::chrono::duration_cast<std::chrono::seconds>(m_NextGcTime - GcClock::Now()); if (RemaingTime < std::chrono::seconds::zero()) @@ -668,7 +613,7 @@ GcScheduler::SchedulerThread() Stopwatch Timer; const auto __ = MakeGuard([&] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_CasGc.CollectGarbage(GcCtx); + m_GcManager.CollectGarbage(GcCtx); m_LastGcTime = GcClock::Now(); m_NextGcTime = NextGcTime(m_LastGcTime); @@ -745,38 +690,37 @@ TEST_CASE("gc.basic") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; + CidStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path() / "cas"; - CasGc Gc; - std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc); - CidStore CidStore{*CasStore, TempDir.Path() / "cid"}; + GcManager Gc; + CidStore CidStore(Gc); - CasStore->Initialize(CasConfig); - Gc.SetCidStore(&CidStore); + CidStore.Initialize(CasConfig); IoBuffer Chunk = CreateChunk(128); auto CompressedChunk = Compress(Chunk); const auto InsertResult = CidStore.AddChunk(CompressedChunk); + CHECK(InsertResult.New); GcContext GcCtx; GcCtx.CollectSmallObjects(true); - CasStore->Flush(); + CidStore.Flush(); Gc.CollectGarbage(GcCtx); - CHECK(!CidStore.ContainsChunk(InsertResult.DecompressedId)); + CHECK(!CidStore.ContainsChunk(IoHash::FromBLAKE3(CompressedChunk.GetRawHash()))); } TEST_CASE("gc.full") { ScopedTemporaryDirectory TempDir; - CasStoreConfiguration CasConfig; + CidStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path() / "cas"; - CasGc Gc; + GcManager Gc; std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc); CasStore->Initialize(CasConfig); @@ -813,7 +757,7 @@ TEST_CASE("gc.full") CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); - CasStoreSize InitialSize = CasStore->TotalSize(); + CidStoreSize InitialSize = CasStore->TotalSize(); // Keep first and last { @@ -823,7 +767,7 @@ TEST_CASE("gc.full") std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[0]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); @@ -856,7 +800,7 @@ TEST_CASE("gc.full") GcCtx.CollectSmallObjects(true); std::vector<IoHash> KeepChunks; KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); @@ -890,7 +834,7 @@ TEST_CASE("gc.full") KeepChunks.push_back(ChunkHashes[1]); KeepChunks.push_back(ChunkHashes[4]); KeepChunks.push_back(ChunkHashes[7]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); @@ -925,7 +869,7 @@ TEST_CASE("gc.full") KeepChunks.push_back(ChunkHashes[6]); KeepChunks.push_back(ChunkHashes[7]); KeepChunks.push_back(ChunkHashes[8]); - GcCtx.ContributeCas(KeepChunks); + GcCtx.AddRetainedCids(KeepChunks); CasStore->Flush(); Gc.CollectGarbage(GcCtx); diff --git a/zenstore/hashkeyset.cpp b/zenstore/hashkeyset.cpp new file mode 100644 index 000000000..a5436f5cb --- /dev/null +++ b/zenstore/hashkeyset.cpp @@ -0,0 +1,60 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/hashkeyset.h> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +void +HashKeySet::AddHashToSet(const IoHash& HashToAdd) +{ + m_HashSet.insert(HashToAdd); +} + +void +HashKeySet::AddHashesToSet(std::span<const IoHash> HashesToAdd) +{ + m_HashSet.insert(HashesToAdd.begin(), HashesToAdd.end()); +} + +void +HashKeySet::RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) +{ + for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd;) + { + if (Predicate(*It)) + { + It = m_HashSet.erase(It); + } + else + { + ++It; + } + } +} + +void +HashKeySet::IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const +{ + for (auto It = begin(m_HashSet), ItEnd = end(m_HashSet); It != ItEnd; ++It) + { + Callback(*It); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// Testing related code follows... +// + +#if ZEN_WITH_TESTS + +void +hashkeyset_forcelink() +{ +} + +#endif + +} // namespace zen diff --git a/zenstore/include/zenstore/cas.h b/zenstore/include/zenstore/cas.h deleted file mode 100644 index 5592fbd0a..000000000 --- a/zenstore/include/zenstore/cas.h +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "zenstore.h" - -#include <zencore/blake3.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/refcount.h> -#include <zencore/timer.h> - -#include <atomic> -#include <filesystem> -#include <functional> -#include <memory> -#include <string> -#include <unordered_set> - -namespace zen { - -class GcContext; -class CasGc; - -struct CasStoreConfiguration -{ - // Root directory for CAS store - std::filesystem::path RootDirectory; - - // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy - uint64_t TinyValueThreshold = 1024; - - // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy - uint64_t HugeValueThreshold = 1024 * 1024; -}; - -/** Manage a set of IoHash values - */ - -class CasChunkSet -{ -public: - void AddChunkToSet(const IoHash& HashToAdd); - void AddChunksToSet(std::span<const IoHash> HashesToAdd); - void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); - void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); - [[nodiscard]] inline bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } - [[nodiscard]] inline bool IsEmpty() const { return m_ChunkSet.empty(); } - [[nodiscard]] inline size_t GetSize() const { return m_ChunkSet.size(); } - - inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) - { - for (const IoHash& Candidate : Candidates) - { - if (ContainsChunk(Candidate)) - { - MatchFunc(Candidate); - } - } - } - - inline void FilterChunks(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) - { - for (const IoHash& Candidate : Candidates) - { - MatchFunc(Candidate, ContainsChunk(Candidate)); - } - } - -private: - // Q: should we protect this with a lock, or is that a higher level concern? - std::unordered_set<IoHash, IoHash::Hasher> m_ChunkSet; -}; - -/** Context object for data scrubbing - * - * Data scrubbing is when we traverse stored data to validate it and - * optionally correct/recover - */ - -class ScrubContext -{ -public: - virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks); - inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } - inline bool RunRecovery() const { return m_Recover; } - void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes); - - inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } - inline uint64_t ScrubbedBytes() const { return m_ByteCount; } - -private: - uint64_t m_ScrubTime = GetHifreqTimerValue(); - bool m_Recover = true; - std::atomic<uint64_t> m_ChunkCount{0}; - std::atomic<uint64_t> m_ByteCount{0}; - CasChunkSet m_BadCas; - CasChunkSet m_BadCid; -}; - -struct CasStoreSize -{ - uint64_t TinySize{}; - uint64_t SmallSize{}; - uint64_t LargeSize{}; - uint64_t TotalSize{}; -}; - -/** Content Addressable Storage interface - - */ - -class CasStore -{ -public: - virtual ~CasStore() = default; - - const CasStoreConfiguration& Config() { return m_Config; } - - struct InsertResult - { - bool New = false; - }; - - virtual void Initialize(const CasStoreConfiguration& Config) = 0; - virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; - virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; - virtual void FilterChunks(CasChunkSet& InOutChunks) = 0; - virtual void Flush() = 0; - virtual void Scrub(ScrubContext& Ctx) = 0; - virtual void GarbageCollect(GcContext& GcCtx) = 0; - virtual CasStoreSize TotalSize() const = 0; - -protected: - CasStoreConfiguration m_Config; - uint64_t m_LastScrubTime = 0; -}; - -ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(CasGc& Gc); - -void CAS_forcelink(); - -} // namespace zen diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index 4b93a708f..c56b653fc 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -2,8 +2,6 @@ #pragma once -#include "zenstore.h" - #include <zencore/uid.h> #include <zenstore/basicfile.h> diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index b0252a2a6..21e3c3160 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -5,7 +5,7 @@ #include "zenstore.h" #include <zencore/iohash.h> -#include <zenstore/cas.h> +#include <zenstore/hashkeyset.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> @@ -15,53 +15,68 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +class GcManager; class CasStore; class CompressedBuffer; class IoBuffer; +class ScrubContext; /** Content Store * - * Data in the content store is referenced by content identifiers (CIDs), rather than their - * literal hash. This class maps uncompressed hashes to compressed hashes and may + * Data in the content store is referenced by content identifiers (CIDs), it works + * with compressed buffers so the CID is expected to be the RAW hash. It stores the + * chunk directly under the RAW hash. + * This class maps uncompressed hashes (CIDs) to compressed hashes and may * be used to deal with other kinds of indirections in the future. For example, if we want * to support chunking then a CID may represent a list of chunks which could be concatenated * to form the referenced chunk. * - * It would likely be possible to implement this mapping in a more efficient way if we - * integrate it into the CAS store itself, so we can avoid maintaining copies of large - * hashes in multiple locations. This would also allow us to consolidate commit logs etc - * which would be more resilient than the current split log scheme - * */ + +struct CidStoreSize +{ + uint64_t TinySize = 0; + uint64_t SmallSize = 0; + uint64_t LargeSize = 0; + uint64_t TotalSize = 0; +}; + +struct CidStoreConfiguration +{ + // Root directory for CAS store + std::filesystem::path RootDirectory; + + // Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy + uint64_t TinyValueThreshold = 1024; + + // Threshold above which values are considered 'huge' and managed using the 'huge values' strategy + uint64_t HugeValueThreshold = 1024 * 1024; +}; + class CidStore { public: - CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir); + CidStore(GcManager& Gc); ~CidStore(); struct InsertResult { - IoHash DecompressedId; - IoHash CompressedHash; - bool New = false; + bool New = false; }; - InsertResult AddChunk(CompressedBuffer& ChunkData); - void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed); + void Initialize(const CidStoreConfiguration& Config); + InsertResult AddChunk(const CompressedBuffer& ChunkData); IoBuffer FindChunkByCid(const IoHash& DecompressedId); bool ContainsChunk(const IoHash& DecompressedId); + void FilterChunks(HashKeySet& InOutChunks); void Flush(); void Scrub(ScrubContext& Ctx); - void RemoveCids(CasChunkSet& CasChunks); - CasStoreSize CasSize() const; - - // TODO: add batch filter support - - IoHash RemapCid(const IoHash& DecompressedId); + CidStoreSize TotalSize() const; private: struct Impl; - std::unique_ptr<Impl> m_Impl; + std::unique_ptr<CasStore> m_CasStore; + std::unique_ptr<Impl> m_Impl; }; } // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index 398025181..656e594af 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -22,8 +22,8 @@ class logger; namespace zen { -class CasChunkSet; -class CasGc; +class HashKeySet; +class GcManager; class CidStore; struct IoHash; @@ -50,18 +50,16 @@ public: GcContext(GcClock::TimePoint Time = GcClock::Now()); ~GcContext(); - void ContributeCids(std::span<const IoHash> Cid); - void ContributeCas(std::span<const IoHash> Hash); - void ContributeCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys); + void AddRetainedCids(std::span<const IoHash> Cid); + void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys); void IterateCids(std::function<void(const IoHash&)> Callback); void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc); - void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc); - void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&, bool)>&& FilterFunc); + void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc); - void DeletedCas(std::span<const IoHash> Cas); - CasChunkSet& DeletedCas(); + void AddDeletedCids(std::span<const IoHash> Cas); + const HashKeySet& DeletedCids(); std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const; @@ -97,13 +95,13 @@ private: class GcContributor { public: - GcContributor(CasGc& Gc); + GcContributor(GcManager& Gc); ~GcContributor(); virtual void GatherReferences(GcContext& GcCtx) = 0; protected: - CasGc& m_Gc; + GcManager& m_Gc; }; struct GcStorageSize @@ -117,23 +115,23 @@ struct GcStorageSize class GcStorage { public: - GcStorage(CasGc& Gc); + GcStorage(GcManager& Gc); ~GcStorage(); virtual void CollectGarbage(GcContext& GcCtx) = 0; virtual GcStorageSize StorageSize() const = 0; private: - CasGc& m_Gc; + GcManager& m_Gc; }; /** GC orchestrator */ -class CasGc +class GcManager { public: - CasGc(); - ~CasGc(); + GcManager(); + ~GcManager(); void AddGcContributor(GcContributor* Contributor); void RemoveGcContributor(GcContributor* Contributor); @@ -143,12 +141,14 @@ public: void CollectGarbage(GcContext& GcCtx); - void SetCidStore(CidStore* Cids); - void OnNewCidReferences(std::span<IoHash> Hashes); - void OnCommittedCidReferences(std::span<IoHash> Hashes); - void OnDroppedCidReferences(std::span<IoHash> Hashes); GcStorageSize TotalStorageSize() const; +#if ZEN_USE_REF_TRACKING + void OnNewCidReferences(std::span<IoHash> Hashes); + void OnCommittedCidReferences(std::span<IoHash> Hashes); + void OnDroppedCidReferences(std::span<IoHash> Hashes); +#endif + private: mutable RwLock m_Lock; std::vector<GcContributor*> m_GcContribs; @@ -180,7 +180,7 @@ struct GcSchedulerConfig class GcScheduler { public: - GcScheduler(CasGc& CasGc); + GcScheduler(GcManager& GcManager); ~GcScheduler(); void Initialize(const GcSchedulerConfig& Config); @@ -201,7 +201,7 @@ private: spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - CasGc& m_CasGc; + GcManager& m_GcManager; GcSchedulerConfig m_Config; GcClock::TimePoint m_LastGcTime{}; GcClock::TimePoint m_NextGcTime{}; diff --git a/zenstore/include/zenstore/hashkeyset.h b/zenstore/include/zenstore/hashkeyset.h new file mode 100644 index 000000000..411a6256e --- /dev/null +++ b/zenstore/include/zenstore/hashkeyset.h @@ -0,0 +1,54 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "zenstore.h" + +#include <zencore/iohash.h> + +#include <functional> +#include <unordered_set> + +namespace zen { + +/** Manage a set of IoHash values + */ + +class HashKeySet +{ +public: + void AddHashToSet(const IoHash& HashToAdd); + void AddHashesToSet(std::span<const IoHash> HashesToAdd); + void RemoveHashesIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); + void IterateHashes(std::function<void(const IoHash& Hash)>&& Callback) const; + [[nodiscard]] inline bool ContainsHash(const IoHash& Hash) const { return m_HashSet.find(Hash) != m_HashSet.end(); } + [[nodiscard]] inline bool IsEmpty() const { return m_HashSet.empty(); } + [[nodiscard]] inline size_t GetSize() const { return m_HashSet.size(); } + + inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&> auto MatchFunc) const + { + for (const IoHash& Candidate : Candidates) + { + if (ContainsHash(Candidate)) + { + MatchFunc(Candidate); + } + } + } + + inline void FilterHashes(std::span<const IoHash> Candidates, Invocable<const IoHash&, bool> auto MatchFunc) const + { + for (const IoHash& Candidate : Candidates) + { + MatchFunc(Candidate, ContainsHash(Candidate)); + } + } + +private: + // Q: should we protect this with a lock, or is that a higher level concern? + std::unordered_set<IoHash, IoHash::Hasher> m_HashSet; +}; + +void hashkeyset_forcelink(); + +} // namespace zen diff --git a/zenstore/include/zenstore/scrubcontext.h b/zenstore/include/zenstore/scrubcontext.h new file mode 100644 index 000000000..bf906492c --- /dev/null +++ b/zenstore/include/zenstore/scrubcontext.h @@ -0,0 +1,40 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/timer.h> + +namespace zen { + +/** Context object for data scrubbing + * + * Data scrubbing is when we traverse stored data to validate it and + * optionally correct/recover + */ + +class ScrubContext +{ +public: + virtual void ReportBadCidChunks(std::span<IoHash> BadCasChunks) { m_BadCid.AddHashesToSet(BadCasChunks); } + inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } + inline bool RunRecovery() const { return m_Recover; } + void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) + { + m_ChunkCount.fetch_add(ChunkCount); + m_ByteCount.fetch_add(ChunkBytes); + } + + inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } + inline uint64_t ScrubbedBytes() const { return m_ByteCount; } + + const HashKeySet BadCids() const { return m_BadCid; } + +private: + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; + std::atomic<uint64_t> m_ChunkCount{0}; + std::atomic<uint64_t> m_ByteCount{0}; + HashKeySet m_BadCid; +}; + +} // namespace zen diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp index 5f40b7f60..836cdf691 100644 --- a/zenstore/zenstore.cpp +++ b/zenstore/zenstore.cpp @@ -4,8 +4,10 @@ #include <zenstore/basicfile.h> #include <zenstore/blockstore.h> -#include <zenstore/cas.h> #include <zenstore/gc.h> +#include <zenstore/hashkeyset.h> + +#include "cas.h" #include "compactcas.h" #include "filecas.h" @@ -20,6 +22,7 @@ zenstore_forcelinktests() blockstore_forcelink(); compactcas_forcelink(); gc_forcelink(); + hashkeyset_forcelink(); } } // namespace zen |