aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore9
-rw-r--r--zen/chunk/chunk.cpp5
-rw-r--r--zen/cmds/deploy.cpp86
-rw-r--r--zen/cmds/deploy.h29
-rw-r--r--zen/cmds/print.cpp23
-rw-r--r--zen/zen.cpp3
-rw-r--r--zen/zen.vcxproj2
-rw-r--r--zen/zen.vcxproj.filters6
-rw-r--r--zencore/compress.cpp1
-rw-r--r--zencore/filesystem.cpp4
-rw-r--r--zencore/include/zencore/refcount.h5
-rw-r--r--zencore/include/zencore/zencore.h4
-rw-r--r--zencore/iobuffer.cpp2
-rw-r--r--zenserver-test/zenserver-test.cpp19
-rw-r--r--zenserver-test/zenserver-test.vcxproj2
-rw-r--r--zenserver/cache/cachetracking.cpp359
-rw-r--r--zenserver/cache/cachetracking.h34
-rw-r--r--zenserver/cache/structuredcache.cpp21
-rw-r--r--zenserver/cache/structuredcache.h4
-rw-r--r--zenserver/cache/structuredcachestore.cpp227
-rw-r--r--zenserver/cache/structuredcachestore.h137
-rw-r--r--zenserver/config.h8
-rw-r--r--zenserver/diag/logging.cpp13
-rw-r--r--zenserver/projectstore.cpp186
-rw-r--r--zenserver/projectstore.h25
-rw-r--r--zenserver/upstream/zen.h6
-rw-r--r--zenserver/zenserver.cpp49
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters2
-rw-r--r--zenstore/CAS.cpp166
-rw-r--r--zenstore/caslog.cpp24
-rw-r--r--zenstore/cidstore.cpp50
-rw-r--r--zenstore/compactcas.cpp105
-rw-r--r--zenstore/compactcas.h38
-rw-r--r--zenstore/filecas.cpp196
-rw-r--r--zenstore/filecas.h21
-rw-r--r--zenstore/gc.cpp183
-rw-r--r--zenstore/include/zenstore/CAS.h36
-rw-r--r--zenstore/include/zenstore/caslog.h4
-rw-r--r--zenstore/include/zenstore/cidstore.h7
-rw-r--r--zenstore/include/zenstore/gc.h88
-rw-r--r--zenstore/zenstore.cpp2
-rw-r--r--zenstore/zenstore.vcxproj2
-rw-r--r--zenutil/zenutil.vcxproj2
44 files changed, 1718 insertions, 479 deletions
diff --git a/.gitignore b/.gitignore
index bf237a7b3..f84d4d94e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,7 +19,6 @@
[Rr]eleases/
x64/
x86/
-bld/
[Bb]in/
[Oo]bj/
[Ll]og/
@@ -204,14 +203,10 @@ ServiceFabricBackup/
*.rptproj.bak
-# generated build files
-makefile
-
# Python Tools for Visual Studio (PTVS)
__pycache__/
*.pyc
-
/vcpkg_installed
.data/
.minio_data/
@@ -225,3 +220,7 @@ tags
.tags
!tags/
/compile_commands.json
+
+# generated build files
+makefile
+/vsxmake*/
diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp
index 3283a8b66..043832dd3 100644
--- a/zen/chunk/chunk.cpp
+++ b/zen/chunk/chunk.cpp
@@ -15,6 +15,7 @@
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zenstore/cas.h>
+#include <zenstore/gc.h>
#include "../internalfile.h"
@@ -942,12 +943,14 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::unique_ptr<zen::CasStore> CasStore;
+ zen::CasGc Gc;
+
if (!m_RootDirectory.empty())
{
zen::CasStoreConfiguration Config;
Config.RootDirectory = m_RootDirectory;
- CasStore.reset(zen::CreateCasStore());
+ CasStore.reset(zen::CreateCasStore(Gc));
CasStore->Initialize(Config);
}
diff --git a/zen/cmds/deploy.cpp b/zen/cmds/deploy.cpp
deleted file mode 100644
index d60392dd5..000000000
--- a/zen/cmds/deploy.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "deploy.h"
-
-#include <zencore/logging.h>
-#include <zencore/string.h>
-
-namespace zen {
-
-DeployCommand::DeployCommand()
-{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_options()("no-clone", "Do not perform block clone", cxxopts::value(m_NoClone)->default_value("false"));
- m_Options.add_options()("clean",
- "Make clean deploy (i.e remove anything in target first)",
- cxxopts::value(m_IsClean)->default_value("false"));
- m_Options.add_option("", "s", "source", "Deploy source", cxxopts::value(m_CopySource), "<build store>");
- m_Options.add_option("", "t", "target", "Deploy target", cxxopts::value(m_CopyTarget), "<directory>");
- m_Options.add_option("", "", "positional", "Positional arguments", cxxopts::value(m_Positional), "");
-}
-
-DeployCommand::~DeployCommand() = default;
-
-int
-DeployCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
-{
- ZEN_UNUSED(GlobalOptions);
-
- m_Options.parse_positional({"source", "target", "positional"});
-
- auto result = m_Options.parse(argc, argv);
-
- if (result.count("help"))
- {
- std::cout << m_Options.help({"", "Group"}) << std::endl;
-
- return 0;
- }
-
- // Validate arguments
-
- if (m_CopySource.empty())
- throw std::runtime_error("No source specified");
-
- if (m_CopyTarget.empty())
- throw std::runtime_error("No target specified");
-
- std::filesystem::path ToPath;
-
- ToPath = m_CopyTarget;
-
- const bool IsTargetDir = std::filesystem::is_directory(ToPath);
- bool IsTargetNew = !std::filesystem::exists(ToPath);
-
- if (!IsTargetNew && !IsTargetDir)
- {
- throw std::runtime_error("Invalid target specification (needs to be a directory)");
- }
-
- zen::ExtendableStringBuilder<128> Path8;
- zen::WideToUtf8(ToPath.c_str(), Path8);
-
- if (IsTargetNew == false && m_IsClean)
- {
- ZEN_INFO("Clean deploy -- deleting directory {}", Path8.c_str());
-
- std::filesystem::remove_all(ToPath);
-
- IsTargetNew = true; // Create fresh new directory
- }
-
- if (IsTargetNew)
- {
- ZEN_INFO("Creating directory {}", Path8.c_str());
-
- std::filesystem::create_directories(ToPath);
- }
-
- ZEN_INFO("Starting deploy operation...");
-
- // TODO: implement!
-
- return 0;
-}
-
-} // namespace zen
diff --git a/zen/cmds/deploy.h b/zen/cmds/deploy.h
deleted file mode 100644
index 975caf9e9..000000000
--- a/zen/cmds/deploy.h
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "../zen.h"
-
-namespace zen {
-
-/** Deploy files from Zen build store
- */
-class DeployCommand : public ZenCmdBase
-{
-public:
- DeployCommand();
- ~DeployCommand();
-
- virtual cxxopts::Options* Options() override { return &m_Options; }
- virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
-
-private:
- cxxopts::Options m_Options{"deploy", "Deploy cooked data"};
- std::vector<std::string> m_Positional;
- std::string m_CopySource;
- std::string m_CopyTarget;
- bool m_NoClone = false;
- bool m_IsClean = false;
-};
-
-} // namespace zen
diff --git a/zen/cmds/print.cpp b/zen/cmds/print.cpp
index aac6afd44..1a73443dc 100644
--- a/zen/cmds/print.cpp
+++ b/zen/cmds/print.cpp
@@ -3,6 +3,7 @@
#include "print.h"
#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/string.h>
@@ -40,9 +41,25 @@ PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (m_Filename.empty())
throw std::runtime_error("No file specified");
- zen::FileContents Fc = zen::ReadFile(m_Filename);
- IoBuffer Data = Fc.Flatten();
- zen::CbObject Object{SharedBuffer(Data)};
+ zen::FileContents Fc = zen::ReadFile(m_Filename);
+
+ if (Fc.ErrorCode)
+ {
+ zen::ConsoleLog().error("Failed to open file '{}': {}", m_Filename, Fc.ErrorCode.message());
+
+ return 1;
+ }
+
+ IoBuffer Data = Fc.Flatten();
+
+ if (CbValidateError Result = ValidateCompactBinary(Data, CbValidateMode::All); Result != CbValidateError::None)
+ {
+ zen::ConsoleLog().error("Data in file '{}' does not appear to be compact binary (validation error {:#x})", m_Filename, Result);
+
+ return 1;
+ }
+
+ zen::CbObject Object{SharedBuffer(Data)};
zen::StringBuilder<1024> ObjStr;
zen::CompactBinaryToJson(Object, ObjStr);
diff --git a/zen/zen.cpp b/zen/zen.cpp
index 3c33ff5e0..f5088533f 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -7,7 +7,6 @@
#include "cmds/cache.h"
#include "cmds/copy.h"
#include "cmds/dedup.h"
-#include "cmds/deploy.h"
#include "cmds/hash.h"
#include "cmds/print.h"
#include "cmds/run.h"
@@ -102,7 +101,6 @@ main(int argc, char** argv)
HashCommand HashCmd;
CopyCommand CopyCmd;
DedupCommand DedupCmd;
- DeployCommand DeployCmd;
DropCommand DropCmd;
ChunkCommand ChunkCmd;
RunCommand RunCmd;
@@ -127,7 +125,6 @@ main(int argc, char** argv)
// clang-format off
{"chunk", &ChunkCmd, "Perform chunking"},
{"copy", &CopyCmd, "Copy file(s)"},
- {"deploy", &DeployCmd, "Deploy data"},
{"dedup", &DedupCmd, "Dedup files"},
{"drop", &DropCmd, "Drop cache bucket(s)"},
{"hash", &HashCmd, "Compute file hashes"},
diff --git a/zen/zen.vcxproj b/zen/zen.vcxproj
index f31c0bc17..717319de4 100644
--- a/zen/zen.vcxproj
+++ b/zen/zen.vcxproj
@@ -97,7 +97,6 @@
<ClCompile Include="cmds\cache.cpp" />
<ClCompile Include="cmds\copy.cpp" />
<ClCompile Include="cmds\dedup.cpp" />
- <ClCompile Include="cmds\deploy.cpp" />
<ClCompile Include="cmds\hash.cpp" />
<ClCompile Include="cmds\print.cpp" />
<ClCompile Include="cmds\run.cpp" />
@@ -113,7 +112,6 @@
<ClInclude Include="cmds\cache.h" />
<ClInclude Include="cmds\copy.h" />
<ClInclude Include="cmds\dedup.h" />
- <ClInclude Include="cmds\deploy.h" />
<ClInclude Include="cmds\hash.h" />
<ClInclude Include="cmds\print.h" />
<ClInclude Include="cmds\run.h" />
diff --git a/zen/zen.vcxproj.filters b/zen/zen.vcxproj.filters
index d983b413c..79580286c 100644
--- a/zen/zen.vcxproj.filters
+++ b/zen/zen.vcxproj.filters
@@ -10,9 +10,6 @@
<ClCompile Include="cmds\dedup.cpp">
<Filter>cmds</Filter>
</ClCompile>
- <ClCompile Include="cmds\deploy.cpp">
- <Filter>cmds</Filter>
- </ClCompile>
<ClCompile Include="cmds\copy.cpp">
<Filter>cmds</Filter>
</ClCompile>
@@ -37,9 +34,6 @@
<ClInclude Include="cmds\hash.h">
<Filter>cmds</Filter>
</ClInclude>
- <ClInclude Include="cmds\deploy.h">
- <Filter>cmds</Filter>
- </ClInclude>
<ClInclude Include="cmds\dedup.h">
<Filter>cmds</Filter>
</ClInclude>
diff --git a/zencore/compress.cpp b/zencore/compress.cpp
index 61f1effe4..35a5acb3a 100644
--- a/zencore/compress.cpp
+++ b/zencore/compress.cpp
@@ -1205,7 +1205,6 @@ TEST_CASE("CompressedBuffer")
SUBCASE("copy uncompressed range")
{
- const uint64_t BlockSize = 64 * sizeof(uint64_t);
const uint64_t N = 1000;
std::vector<uint64_t> ExpectedValues = GenerateData(N);
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index a642e2cf6..bcbb36c73 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -701,9 +701,7 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr
}
else if (DirInfo->FileAttributes & FILE_ATTRIBUTE_DEVICE)
{
- ZEN_WARN("encountered device node during file system traversal: {} found in {}",
- WideToUtf8(FileName),
- WideToUtf8(RootDir.c_str()));
+ ZEN_WARN("encountered device node during file system traversal: '{}' found in '{}'", WideToUtf8(FileName), RootDir);
}
else
{
diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h
index 7167ab3b5..1873ce48e 100644
--- a/zencore/include/zencore/refcount.h
+++ b/zencore/include/zencore/refcount.h
@@ -117,8 +117,9 @@ public:
inline ~Ref() { m_Ref && m_Ref->Release(); }
template<typename DerivedType>
- requires std::derived_from<DerivedType, T>
- inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref) {}
+ requires std::derived_from<DerivedType, T> inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref)
+ {
+ }
[[nodiscard]] inline bool IsNull() const { return m_Ref == nullptr; }
inline explicit operator bool() const { return m_Ref != nullptr; }
diff --git a/zencore/include/zencore/zencore.h b/zencore/include/zencore/zencore.h
index d654770d0..6b9a0f658 100644
--- a/zencore/include/zencore/zencore.h
+++ b/zencore/include/zencore/zencore.h
@@ -62,11 +62,13 @@
#if ZEN_COMPILER_MSC
# pragma warning(disable : 4324) // warning C4324: '<type>': structure was padded due to alignment specifier
+# pragma warning(default : 4668) // warning C4668: 'symbol' is not defined as a preprocessor macro, replacing with '0' for 'directives'
+# pragma warning(default : 4100) // warning C4100: 'identifier' : unreferenced formal parameter
#endif
#ifndef ZEN_THIRD_PARTY_INCLUDES_START
# if ZEN_COMPILER_MSC
-# define ZEN_THIRD_PARTY_INCLUDES_START __pragma(warning(push)) __pragma(warning(disable : 4668))
+# define ZEN_THIRD_PARTY_INCLUDES_START __pragma(warning(push)) __pragma(warning(disable : 4668 4127))
# else
# define ZEN_THIRD_PARTY_INCLUDES_START
# endif
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index 7077942bf..6cee3f60d 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -500,7 +500,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName)
Handle = DataFile.Detach();
#else
- int Fd = open(FileName.native().c_str(), O_RDONLY);
+ int Fd = open(FileName.native().c_str(), O_RDONLY);
if (Fd < 0)
{
return {};
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 8f38cc1be..b3a4348f0 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -54,6 +54,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include <asio.hpp>
+#define ZEN_USE_EXEC 0 // Note: this should really be a global define to match the zenserver definition
+
//////////////////////////////////////////////////////////////////////////
#include "projectclient.h"
@@ -1744,6 +1746,8 @@ TEST_CASE("zcache.policy")
zen::CbPackage Package;
const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
+
CbObject CacheRecord = Package.GetObject();
std::vector<IoHash> AttachmentKeys;
@@ -1763,6 +1767,7 @@ TEST_CASE("zcache.policy")
zen::CbPackage Package;
const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
CHECK(Package.GetAttachments().size() != 0);
}
}
@@ -1801,6 +1806,7 @@ TEST_CASE("zcache.policy")
zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
zen::CbPackage Package;
const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
CbObject CacheRecord = Package.GetObject();
std::vector<IoHash> AttachmentKeys;
@@ -1820,6 +1826,7 @@ TEST_CASE("zcache.policy")
zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
zen::CbPackage Package;
const bool Ok = Package.TryLoad(Body);
+ CHECK(Ok);
CHECK(Package.GetAttachments().size() != 0);
}
@@ -2197,6 +2204,8 @@ TEST_CASE("zcache.rpc")
}
}
+# if ZEN_USE_EXEC
+
struct RemoteExecutionRequest
{
RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath)
@@ -2352,7 +2361,7 @@ TEST_CASE("exec.basic")
RemoteRequest.Prep();
zen::CbObject Result = RemoteRequest.Exec();
- CHECK(Result["exitcode"].AsInt32(-1) == 0);
+ CHECK(Result["exitcode"sv].AsInt32(-1) == 0);
}
{
@@ -2361,14 +2370,14 @@ TEST_CASE("exec.basic")
RemoteRequest.Prep();
zen::CbObject Result = RemoteRequest.Exec();
- CHECK(Result["exitcode"].AsInt32(-1) == 1);
+ CHECK(Result["exitcode"sv].AsInt32(-1) == 1);
}
}
TEST_CASE("mesh.basic")
{
// --mesh option only available with ZEN_ENABLE_MESH
-# if ZEN_ENABLE_MESH
+# if ZEN_ENABLE_MESH
using namespace std::literals;
const int kInstanceCount = 4;
@@ -2393,9 +2402,11 @@ TEST_CASE("mesh.basic")
Instance->WaitUntilReady();
}
-# endif
+# endif
}
+# endif
+
class ZenServerTestHelper
{
public:
diff --git a/zenserver-test/zenserver-test.vcxproj b/zenserver-test/zenserver-test.vcxproj
index a39fce7ec..d632e395e 100644
--- a/zenserver-test/zenserver-test.vcxproj
+++ b/zenserver-test/zenserver-test.vcxproj
@@ -67,7 +67,6 @@
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
@@ -79,7 +78,6 @@
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
diff --git a/zenserver/cache/cachetracking.cpp b/zenserver/cache/cachetracking.cpp
new file mode 100644
index 000000000..6a9b2f403
--- /dev/null
+++ b/zenserver/cache/cachetracking.cpp
@@ -0,0 +1,359 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "cachetracking.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/endian.h>
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this
+#include <fmt/format.h>
+#include <rocksdb/db.h>
+#include <tsl/robin_map.h>
+#include <tsl/robin_set.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace fmt::literals;
+
+namespace rocksdb = ROCKSDB_NAMESPACE;
+
+static constinit auto Epoch = std::chrono::time_point<std::chrono::system_clock>{};
+
+static uint64_t
+GetCurrentCacheTimeStamp()
+{
+ auto Duration = std::chrono::system_clock::now() - Epoch;
+ uint64_t Millis = std::chrono::duration_cast<std::chrono::milliseconds>(Duration).count();
+
+ return Millis;
+}
+
+struct CacheAccessSnapshot
+{
+public:
+ void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey)
+ {
+ BucketTracker* Tracker = GetBucket(std::string(BucketSegment));
+
+ Tracker->Track(HashKey);
+ }
+
+ void SerializeSnapshot(CbObjectWriter& Cbo)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ for (const auto& Kv : m_BucketMapping)
+ {
+ Cbo.BeginArray(Kv.first);
+ m_Buckets[Kv.second]->SerializeSnapshotAndClear(Cbo);
+ Cbo.EndArray();
+ }
+ }
+
+private:
+ struct BucketTracker
+ {
+ RwLock Lock;
+ tsl::robin_set<IoHash> AccessedKeys;
+
+ void Track(const IoHash& HashKey)
+ {
+ if (RwLock::SharedLockScope _(Lock); AccessedKeys.contains(HashKey))
+ {
+ return;
+ }
+
+ RwLock::ExclusiveLockScope _(Lock);
+
+ AccessedKeys.insert(HashKey);
+ }
+
+ void SerializeSnapshotAndClear(CbObjectWriter& Cbo)
+ {
+ RwLock::ExclusiveLockScope _(Lock);
+
+ for (const IoHash& Hash : AccessedKeys)
+ {
+ Cbo.AddHash(Hash);
+ }
+
+ AccessedKeys.clear();
+ }
+ };
+
+ BucketTracker* GetBucket(const std::string& BucketName)
+ {
+ RwLock::SharedLockScope _(m_Lock);
+
+ if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end())
+ {
+ _.ReleaseNow();
+
+ return AddNewBucket(BucketName);
+ }
+ else
+ {
+ return m_Buckets[It->second].get();
+ }
+ }
+
+ BucketTracker* AddNewBucket(const std::string& BucketName)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_BucketMapping.find(BucketName); It == m_BucketMapping.end())
+ {
+ const uint32_t BucketIndex = gsl::narrow<uint32_t>(m_Buckets.size());
+ m_Buckets.emplace_back(std::make_unique<BucketTracker>());
+ m_BucketMapping[BucketName] = BucketIndex;
+
+ return m_Buckets[BucketIndex].get();
+ }
+ else
+ {
+ return m_Buckets[It->second].get();
+ }
+ }
+
+ RwLock m_Lock;
+ std::vector<std::unique_ptr<BucketTracker>> m_Buckets;
+ tsl::robin_map<std::string, uint32_t> m_BucketMapping;
+};
+
+struct ZenCacheTracker::Impl
+{
+ Impl(std::filesystem::path StateDirectory)
+ {
+ std::filesystem::path StatsDbPath{StateDirectory / ".zdb"};
+
+ std::string RocksdbPath = ToUtf8(StatsDbPath);
+
+ ZEN_DEBUG("opening tracker db at '{}'", RocksdbPath);
+
+ rocksdb::DB* Db = nullptr;
+ rocksdb::DBOptions Options;
+ Options.create_if_missing = true;
+
+ std::vector<std::string> ExistingColumnFamilies;
+ rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies);
+
+ std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors;
+
+ if (Status.IsPathNotFound())
+ {
+ ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}});
+ }
+ else if (Status.ok())
+ {
+ for (const std::string& Column : ExistingColumnFamilies)
+ {
+ rocksdb::ColumnFamilyDescriptor ColumnFamily;
+ ColumnFamily.name = Column;
+ ColumnDescriptors.push_back(ColumnFamily);
+ }
+ }
+ else
+ {
+ throw std::runtime_error("column family iteration failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str());
+ }
+
+ Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db);
+
+ if (!Status.ok())
+ {
+ throw std::runtime_error("database open failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str());
+ }
+
+ m_RocksDb.reset(Db);
+ }
+
+ ~Impl()
+ {
+ for (auto* Column : m_RocksDbColumnHandles)
+ {
+ delete Column;
+ }
+
+ m_RocksDbColumnHandles.clear();
+ }
+
+ struct KeyStruct
+ {
+ uint64_t TimestampLittleEndian;
+ };
+
+ void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey) { m_CurrentSnapshot.TrackAccess(BucketSegment, HashKey); }
+
+ void SaveSnapshot()
+ {
+ CbObjectWriter Cbo;
+ m_CurrentSnapshot.SerializeSnapshot(Cbo);
+ IoBuffer SnapshotBuffer = Cbo.Save().GetBuffer().AsIoBuffer();
+
+ const KeyStruct Key{.TimestampLittleEndian = ToNetworkOrder(GetCurrentCacheTimeStamp())};
+ rocksdb::Slice KeySlice{(const char*)&Key, sizeof Key};
+ rocksdb::Slice ValueSlice{(char*)SnapshotBuffer.Data(), SnapshotBuffer.Size()};
+
+ rocksdb::WriteOptions Wo;
+ m_RocksDb->Put(Wo, KeySlice, ValueSlice);
+ }
+
+ void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback)
+ {
+ rocksdb::ManagedSnapshot Snap(m_RocksDb.get());
+
+ rocksdb::ReadOptions Ro;
+ Ro.snapshot = Snap.snapshot();
+
+ std::unique_ptr<rocksdb::Iterator> It{m_RocksDb->NewIterator(Ro)};
+
+ const KeyStruct ZeroKey{.TimestampLittleEndian = 0};
+ rocksdb::Slice ZeroKeySlice{(const char*)&ZeroKey, sizeof ZeroKey};
+
+ It->Seek(ZeroKeySlice);
+
+ while (It->Valid())
+ {
+ rocksdb::Slice KeySlice = It->key();
+ rocksdb::Slice ValueSlice = It->value();
+
+ if (KeySlice.size() == sizeof(KeyStruct))
+ {
+ IoBuffer ValueBuffer(IoBuffer::Wrap, ValueSlice.data(), ValueSlice.size());
+
+ CbObject Value = LoadCompactBinaryObject(ValueBuffer);
+
+ uint64_t Key = FromNetworkOrder(*reinterpret_cast<const uint64_t*>(KeySlice.data()));
+
+ Callback(Key, Value);
+ }
+
+ It->Next();
+ }
+ }
+
+ std::unique_ptr<rocksdb::DB> m_RocksDb;
+ std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles;
+ CacheAccessSnapshot m_CurrentSnapshot;
+};
+
+ZenCacheTracker::ZenCacheTracker(std::filesystem::path StateDirectory) : m_Impl(new Impl(StateDirectory))
+{
+}
+
+ZenCacheTracker::~ZenCacheTracker()
+{
+ delete m_Impl;
+}
+
+void
+ZenCacheTracker::TrackAccess(std::string_view BucketSegment, const IoHash& HashKey)
+{
+ m_Impl->TrackAccess(BucketSegment, HashKey);
+}
+
+void
+ZenCacheTracker::SaveSnapshot()
+{
+ m_Impl->SaveSnapshot();
+}
+
+void
+ZenCacheTracker::IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback)
+{
+ m_Impl->IterateSnapshots(std::move(Callback));
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("z$.tracker")
+{
+ using namespace fmt::literals;
+ using namespace std::literals;
+
+ const uint64_t t0 = GetCurrentCacheTimeStamp();
+
+ ScopedTemporaryDirectory TempDir;
+
+ ZenCacheTracker Zcs(TempDir.Path());
+
+ tsl::robin_set<IoHash> KeyHashes;
+
+ for (int i = 0; i < 10000; ++i)
+ {
+ IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i);
+
+ KeyHashes.insert(KeyHash);
+
+ Zcs.TrackAccess("foo"sv, KeyHash);
+ }
+
+ for (int i = 0; i < 10000; ++i)
+ {
+ IoHash KeyHash = IoHash::HashBuffer(&i, sizeof i);
+
+ Zcs.TrackAccess("foo"sv, KeyHash);
+ }
+
+ Zcs.SaveSnapshot();
+
+ for (int n = 0; n < 10; ++n)
+ {
+ for (int i = 0; i < 1000; ++i)
+ {
+ const int Index = i + n * 1000;
+ IoHash KeyHash = IoHash::HashBuffer(&Index, sizeof Index);
+
+ Zcs.TrackAccess("foo"sv, KeyHash);
+ }
+
+ Zcs.SaveSnapshot();
+ }
+
+ Zcs.SaveSnapshot();
+
+ const uint64_t t1 = GetCurrentCacheTimeStamp();
+
+ int SnapshotCount = 0;
+
+ Zcs.IterateSnapshots([&](uint64_t TimeStamp, CbObject Snapshot) {
+ CHECK(TimeStamp >= t0);
+ CHECK(TimeStamp <= t1);
+
+ for (auto& Field : Snapshot)
+ {
+ CHECK_EQ(Field.GetName(), "foo"sv);
+
+ const CbArray& Array = Field.AsArray();
+
+ for (const auto& Element : Array)
+ {
+ CHECK(KeyHashes.contains(Element.GetValue().AsHash()));
+ }
+ }
+
+ ++SnapshotCount;
+ });
+
+ CHECK_EQ(SnapshotCount, 11);
+}
+
+#endif
+
+void
+cachetracker_forcelink()
+{
+}
+
+} // namespace zen
diff --git a/zenserver/cache/cachetracking.h b/zenserver/cache/cachetracking.h
new file mode 100644
index 000000000..d4bd8da25
--- /dev/null
+++ b/zenserver/cache/cachetracking.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <zencore/iohash.h>
+
+#include <filesystem>
+#include <functional>
+#include <stdint.h>
+
+namespace zen {
+
+class CbObject;
+
+/**
+ */
+
+class ZenCacheTracker
+{
+public:
+ ZenCacheTracker(std::filesystem::path StateDirectory);
+ ~ZenCacheTracker();
+
+ void TrackAccess(std::string_view BucketSegment, const IoHash& HashKey);
+ void SaveSnapshot();
+ void IterateSnapshots(std::function<void(uint64_t TimeStamp, CbObject Snapshot)>&& Callback);
+
+private:
+ struct Impl;
+
+ Impl* m_Impl = nullptr;
+};
+
+void cachetracker_forcelink();
+
+} // namespace zen
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 726bd7cdb..53e1b1c61 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1,7 +1,10 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include "structuredcache.h"
+
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
@@ -15,15 +18,12 @@
//#include "cachekey.h"
#include "monitoring/httpstats.h"
-#include "structuredcache.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
-#include <zencore/compactbinarypackage.h>
-
#include <algorithm>
#include <atomic>
#include <filesystem>
@@ -59,7 +59,6 @@ struct AttachmentCount
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CasStore& InStore,
CidStore& InCidStore,
HttpStatsService& StatsService,
HttpStatusService& StatusService,
@@ -68,7 +67,6 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCac
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
-, m_CasStore(InStore)
, m_CidStore(InCidStore)
, m_UpstreamCache(std::move(UpstreamCache))
{
@@ -105,7 +103,6 @@ HttpStructuredCacheService::Scrub(ScrubContext& Ctx)
m_LastScrubTime = Ctx.ScrubTimestamp();
- m_CasStore.Scrub(Ctx);
m_CidStore.Scrub(Ctx);
m_CacheStore.Scrub(Ctx);
}
@@ -618,12 +615,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
- Payload = UpstreamResult.Value;
- IoHash ChunkHash = IoHash::HashBuffer(Payload);
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
- InUpstreamCache = true;
-
- m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
+ InUpstreamCache = true;
}
else
{
@@ -691,9 +684,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv);
}
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash);
-
- m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})",
Ref.BucketSegment,
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 51073d05d..9ee7da99b 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -18,6 +18,8 @@ namespace zen {
class CasStore;
class CidStore;
+class CbObjectView;
+class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
enum class CachePolicy : uint32_t;
@@ -54,7 +56,6 @@ class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider
{
public:
HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CasStore& InCasStore,
CidStore& InCidStore,
HttpStatsService& StatsService,
HttpStatusService& StatusService,
@@ -101,7 +102,6 @@ private:
ZenCacheStore& m_CacheStore;
HttpStatsService& m_StatsService;
HttpStatusService& m_StatusService;
- CasStore& m_CasStore;
CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
uint64_t m_LastScrubTime = 0;
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 8b9ce8ff9..44226457c 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -2,22 +2,25 @@
#include "structuredcachestore.h"
-#include <zencore/except.h>
-#include <zencore/windows.h>
+#include "cachetracking.h"
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
+#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/testutils.h>
#include <zencore/thread.h>
+#include <zencore/windows.h>
+#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
@@ -25,6 +28,7 @@
#include <concepts>
#include <filesystem>
+#include <memory_resource>
#include <ranges>
#include <unordered_map>
@@ -39,12 +43,14 @@ namespace zen {
using namespace fmt::literals;
-ZenCacheStore::ZenCacheStore(const std::filesystem::path& RootDir) : m_DiskLayer{RootDir}
+ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcContributor(Gc), m_DiskLayer(RootDir)
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
CreateDirectories(RootDir);
m_DiskLayer.DiscoverBuckets();
+
+ m_AccessTracker.reset(new ZenCacheTracker(RootDir));
}
ZenCacheStore::~ZenCacheStore()
@@ -56,21 +62,27 @@ ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheVal
{
bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue);
+ auto _ = MakeGuard([&] {
+ if (!Ok)
+ return;
+
+ m_AccessTracker->TrackAccess(InBucket, HashKey);
+ });
+
if (Ok)
{
ZEN_ASSERT(OutValue.Value.Size());
+
+ return true;
}
- if (!Ok)
- {
- Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
+ Ok = m_DiskLayer.Get(InBucket, HashKey, OutValue);
- if (Ok)
- {
- ZEN_ASSERT(OutValue.Value.Size());
- }
+ if (Ok)
+ {
+ ZEN_ASSERT(OutValue.Value.Size());
- if (Ok && (OutValue.Value.Size() <= m_DiskLayerSizeThreshold))
+ if (OutValue.Value.Size() <= m_DiskLayerSizeThreshold)
{
m_MemLayer.Put(InBucket, HashKey, OutValue);
}
@@ -88,6 +100,25 @@ ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCa
m_DiskLayer.Put(InBucket, HashKey, Value);
+#if ZEN_USE_REF_TRACKING
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ if (ValidateCompactBinary(Value.Value, CbValidateMode::All) == CbValidateError::None)
+ {
+ CbObject Object{SharedBuffer(Value.Value)};
+
+ uint8_t TempBuffer[8 * sizeof(IoHash)];
+ std::pmr::monotonic_buffer_resource Linear{TempBuffer, sizeof TempBuffer};
+ std::pmr::polymorphic_allocator Allocator{&Linear};
+ std::pmr::vector<IoHash> CidReferences{Allocator};
+
+ Object.IterateAttachments([&](CbFieldView Field) { CidReferences.push_back(Field.AsAttachment()); });
+
+ m_Gc.OnNewCidReferences(CidReferences);
+ }
+ }
+#endif
+
if (Value.Value.Size() <= m_DiskLayerSizeThreshold)
{
m_MemLayer.Put(InBucket, HashKey, Value);
@@ -131,10 +162,10 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheStore::GarbageCollect(GcContext& GcCtx)
+ZenCacheStore::GatherReferences(GcContext& GcCtx)
{
- m_DiskLayer.GarbageCollect(GcCtx);
- m_MemLayer.GarbageCollect(GcCtx);
+ m_MemLayer.GatherReferences(GcCtx);
+ m_DiskLayer.GatherReferences(GcCtx);
}
//////////////////////////////////////////////////////////////////////////
@@ -220,13 +251,13 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx)
+ZenCacheMemoryLayer::GatherReferences(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- Kv.second.GarbageCollect(GcCtx);
+ Kv.second.GatherReferences(GcCtx);
}
}
@@ -252,7 +283,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
+ZenCacheMemoryLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
// Is it even meaningful to do this? The memory layer shouldn't
// contain anything which is not already in the disk layer
@@ -316,53 +347,106 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
//////////////////////////////////////////////////////////////////////////
-inline DiskLocation::DiskLocation() = default;
+#pragma pack(push)
+#pragma pack(1)
-inline DiskLocation::DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
-: OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
-, LowerSize(ValueSize & 0xFFFFffff)
-, IndexDataSize(IndexSize)
+struct DiskLocation
{
-}
+ inline DiskLocation() = default;
-inline uint64_t
-DiskLocation::CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags)
-{
- return Offset | Flags;
-}
+ inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+ : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+ , LowerSize(ValueSize & 0xFFFFffff)
+ , IndexDataSize(IndexSize)
+ {
+ }
-inline uint64_t
-DiskLocation::Offset() const
-{
- return OffsetAndFlags & kOffsetMask;
-}
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
-inline uint64_t
-DiskLocation::Size() const
-{
- return LowerSize;
-}
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
-inline uint64_t
-DiskLocation::IsFlagSet(uint64_t Flag) const
-{
- return OffsetAndFlags & Flag;
-}
+ inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
+ inline uint64_t Size() const { return LowerSize; }
+ inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
+ inline ZenContentType GetContentType() const
+ {
+ ZenContentType ContentType = ZenContentType::kBinary;
-inline ZenContentType
-DiskLocation::GetContentType() const
-{
- ZenContentType ContentType = ZenContentType::kBinary;
+ if (IsFlagSet(DiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
+ }
- if (IsFlagSet(DiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
+ return ContentType;
}
- return ContentType;
-}
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+};
-//////////////////////////////////////////////////////////////////////////
+struct DiskIndexEntry
+{
+ IoHash Key;
+ DiskLocation Location;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(DiskIndexEntry) == 36);
+
+struct ZenCacheDiskLayer::CacheBucket
+{
+ CacheBucket();
+ ~CacheBucket();
+
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ static bool Delete(std::filesystem::path BucketDir);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ void Drop();
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ void GatherReferences(GcContext& GcCtx);
+
+ inline bool IsOk() const { return m_IsOk; }
+
+private:
+ std::filesystem::path m_BucketDir;
+ Oid m_BucketId;
+ bool m_IsOk = false;
+ uint64_t m_LargeObjectThreshold = 64 * 1024;
+
+ // These files are used to manage storage of small objects for this bucket
+
+ BasicFile m_SobsFile;
+ TCasLogFile<DiskIndexEntry> m_SlogFile;
+
+ RwLock m_IndexLock;
+ tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
+
+ void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+
+ // These locks are here to avoid contention on file creation, therefore it's sufficient
+ // that we take the same lock for the same hash
+ //
+ // These locks are small and should really be spaced out so they don't share cache lines,
+ // but we don't currently access them at particularly high frequency so it should not be
+ // an issue in practice
+
+ RwLock m_ShardedLocks[256];
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
+};
ZenCacheDiskLayer::CacheBucket::CacheBucket()
{
@@ -454,16 +538,29 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
m_SlogFile.Open(SlogPath, IsNew);
- uint64_t MaxFileOffset = 0;
+ uint64_t MaxFileOffset = 0;
+ uint64_t InvalidEntryCount = 0;
if (RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty())
{
m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
- m_Index[Record.Key] = Record.Location;
+ if (Record.Key == IoHash::Zero)
+ {
+ ++InvalidEntryCount;
+ }
+ else
+ {
+ m_Index[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size());
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size());
+ }
});
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, SlogPath);
+ }
+
m_WriteCursor = (MaxFileOffset + 15) & ~15;
}
@@ -661,7 +758,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx)
+ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_IndexLock);
@@ -924,7 +1021,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
{
// New bucket needs to be created
- std::string BucketName8 = WideToUtf8(BucketName);
+ const std::string BucketName8 = ToUtf8(BucketName);
if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end())
{
@@ -940,7 +1037,11 @@ ZenCacheDiskLayer::DiscoverBuckets()
Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false);
- if (!Bucket.IsOk())
+ if (Bucket.IsOk())
+ {
+ ZEN_INFO("Discovered bucket '{}'", BucketName8);
+ }
+ else
{
ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir);
@@ -1007,13 +1108,13 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
}
void
-ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx)
+ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
{
RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
- Kv.second.GarbageCollect(GcCtx);
+ Kv.second.GatherReferences(GcCtx);
}
}
@@ -1028,7 +1129,9 @@ TEST_CASE("z$.store")
ScopedTemporaryDirectory TempDir;
- ZenCacheStore Zcs(TempDir.Path() / "cache");
+ CasGc Gc;
+
+ ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
const int kIterationCount = 100;
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 0dfcbc5ca..8e1260b52 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -10,11 +10,11 @@
#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
+#include <zenstore/gc.h>
-#pragma warning(push)
-#pragma warning(disable : 4127)
+ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
-#pragma warning(pop)
+ZEN_THIRD_PARTY_INCLUDES_END
#include <compare>
#include <filesystem>
@@ -22,8 +22,10 @@
namespace zen {
-class WideStringBuilderBase;
class CasStore;
+class CasGc;
+class WideStringBuilderBase;
+class ZenCacheTracker;
/******************************************************************************
@@ -52,6 +54,9 @@ struct ZenCacheValue
Intended for small values which are frequently accessed
+ This should have a better memory management policy to maintain reasonable
+ footprint.
+
*/
class ZenCacheMemoryLayer
{
@@ -63,7 +68,7 @@ public:
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Bucket);
void Scrub(ScrubContext& Ctx);
- void GarbageCollect(GcContext& GcCtx);
+ void GatherReferences(GcContext& GcCtx);
struct Configuration
{
@@ -89,7 +94,7 @@ private:
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Scrub(ScrubContext& Ctx);
- void GarbageCollect(GcContext& GcCtx);
+ void GatherReferences(GcContext& GcCtx);
private:
uint64_t GetCurrentTimeStamp();
@@ -98,44 +103,11 @@ private:
RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets;
Configuration m_Configuration;
-};
-#pragma pack(push)
-#pragma pack(1)
-
-struct DiskLocation
-{
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
-
- DiskLocation();
- DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags);
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags);
- uint64_t Offset() const;
- uint64_t Size() const;
- uint64_t IsFlagSet(uint64_t Flag) const;
- ZenContentType GetContentType() const;
-
-private:
- uint64_t OffsetAndFlags = 0;
- uint32_t LowerSize = 0;
- uint32_t IndexDataSize = 0;
+ ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete;
+ ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete;
};
-struct DiskIndexEntry
-{
- IoHash Key;
- DiskLocation Location;
-};
-
-#pragma pack(pop)
-
-static_assert(sizeof(DiskIndexEntry) == 36);
-
class ZenCacheDiskLayer
{
public:
@@ -147,7 +119,7 @@ public:
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
- void GarbageCollect(GcContext& GcCtx);
+ void GatherReferences(GcContext& GcCtx);
void DiscoverBuckets();
@@ -155,78 +127,39 @@ private:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
*/
- struct CacheBucket
- {
- CacheBucket();
- ~CacheBucket();
-
- void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- static bool Delete(std::filesystem::path BucketDir);
-
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value);
- void Drop();
- void Flush();
- void Scrub(ScrubContext& Ctx);
- void GarbageCollect(GcContext& GcCtx);
-
- inline bool IsOk() const { return m_IsOk; }
-
- private:
- std::filesystem::path m_BucketDir;
- Oid m_BucketId;
- bool m_IsOk = false;
- uint64_t m_LargeObjectThreshold = 64 * 1024;
-
- // These files are used to manage storage of small objects for this bucket
-
- BasicFile m_SobsFile;
- TCasLogFile<DiskIndexEntry> m_SlogFile;
-
- RwLock m_IndexLock;
- tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
- uint64_t m_WriteCursor = 0;
-
- void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
- bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
-
- // These locks are here to avoid contention on file creation, therefore it's sufficient
- // that we take the same lock for the same hash
- //
- // These locks are small and should really be spaced out so they don't share cache lines,
- // but we don't currently access them at particularly high frequency so it should not be
- // an issue in practice
-
- RwLock m_ShardedLocks[256];
- inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
- };
+ struct CacheBucket;
std::filesystem::path m_RootDir;
RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive
+
+ ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
+ ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
};
-class ZenCacheStore
+class ZenCacheStore : public GcContributor
{
public:
- explicit ZenCacheStore(const std::filesystem::path& RootDir);
+ ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir);
~ZenCacheStore();
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
- bool DropBucket(std::string_view Bucket);
- void Flush();
- void Scrub(ScrubContext& Ctx);
- void GarbageCollect(GcContext& GcCtx);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
+ bool DropBucket(std::string_view Bucket);
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ virtual void GatherReferences(GcContext& GcCtx) override;
private:
- std::filesystem::path m_RootDir;
- ZenCacheMemoryLayer m_MemLayer;
- ZenCacheDiskLayer m_DiskLayer;
- uint64_t m_DiskLayerSizeThreshold = 4 * 1024;
- uint64_t m_LastScrubTime = 0;
+ std::filesystem::path m_RootDir;
+ ZenCacheMemoryLayer m_MemLayer;
+ ZenCacheDiskLayer m_DiskLayer;
+ uint64_t m_DiskLayerSizeThreshold = 1 * 1024;
+ uint64_t m_LastScrubTime = 0;
+ std::unique_ptr<ZenCacheTracker> m_AccessTracker;
+
+ ZenCacheStore(const ZenCacheStore&) = delete;
+ ZenCacheStore& operator=(const ZenCacheStore&) = delete;
};
void z$_forcelink();
diff --git a/zenserver/config.h b/zenserver/config.h
index 36156a570..f6858b940 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -9,6 +9,14 @@
# define ZEN_ENABLE_MESH 0
#endif
+#ifndef ZEN_USE_NAMED_PIPES
+# define ZEN_USE_NAMED_PIPES 0
+#endif
+
+#ifndef ZEN_USE_EXEC
+# define ZEN_USE_EXEC 0
+#endif
+
struct ZenServerOptions
{
bool IsDebug = false;
diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp
index 6e2559f1f..728001202 100644
--- a/zenserver/diag/logging.cpp
+++ b/zenserver/diag/logging.cpp
@@ -16,7 +16,10 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <spdlog/sinks/stdout_color_sinks.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#include <zencore/filesystem.h>
#include <zencore/string.h>
+
+#include <chrono>
#include <memory>
// Custom logging -- test code, this should be tweaked
@@ -236,7 +239,7 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
/* truncate */ false,
uint16_t(/* max files */ 14));
#else
- auto FileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()),
+ auto FileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(zen::ToUtf8(LogPath),
/* max size */ 128 * 1024 * 1024,
/* max files */ 16,
/* rotate on open */ true);
@@ -272,12 +275,12 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
auto HttpLogger = std::make_shared<spdlog::logger>("http_requests", HttpSink);
spdlog::register_logger(HttpLogger);
- // Jupiter - only log HTTP traffic to file
+ // Jupiter - only log upstream HTTP traffic to file
auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink);
spdlog::register_logger(JupiterLogger);
- // Zen - only log HTTP traffic to file
+ // Zen - only log upstream HTTP traffic to file
auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink);
spdlog::register_logger(ZenClientLogger);
@@ -288,6 +291,10 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
spdlog::flush_on(spdlog::level::err);
spdlog::flush_every(std::chrono::seconds{2});
spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now()));
+
+ FileSink->set_pattern("[%C-%m-%d.%e %T] [%n] [%l] %v");
+
+ spdlog::info("log starting at {:%FT%T%z}", std::chrono::system_clock::now());
}
void
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 73d61c124..8f8b6e163 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -10,14 +10,20 @@
#include <zencore/logging.h>
#include <zencore/stream.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/windows.h>
#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
+#include "config.h"
+
#define USE_ROCKSDB 0
+ZEN_THIRD_PARTY_INCLUDES_START
+
#if USE_ROCKSDB
# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this
# include <rocksdb/db.h>
@@ -25,6 +31,8 @@
#include <xxh3.h>
#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
#include <latch>
#include <string>
@@ -165,7 +173,16 @@ struct ProjectStore::OplogStorage : public RefCounted
Stopwatch Timer;
+ uint64_t InvalidEntries = 0;
+
m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) {
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
+
+ return;
+ }
+
IoBuffer OpBuffer(LogEntry.OpCoreSize);
const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
@@ -190,6 +207,11 @@ struct ProjectStore::OplogStorage : public RefCounted
Handler(Op, LogEntry);
});
+ if (InvalidEntries)
+ {
+ ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries);
+ }
+
ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
m_MaxLsn,
@@ -221,6 +243,8 @@ struct ProjectStore::OplogStorage : public RefCounted
const uint64_t WriteSize = Buffer.GetSize();
const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+ ZEN_ASSERT(WriteSize != 0);
+
XXH3_128Stream KeyHasher;
Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
XXH3_128 KeyHash = KeyHasher.GetHash();
@@ -298,6 +322,36 @@ ProjectStore::Oplog::Flush()
m_Storage->Flush();
}
+void
+ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const
+{
+ ZEN_UNUSED(Ctx);
+}
+
+void
+ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ std::vector<IoHash> Hashes;
+
+ for (const auto& Kv : m_ChunkMap)
+ {
+ Hashes.push_back(Kv.second);
+ }
+
+ GcCtx.ContributeCids(Hashes);
+
+ Hashes.clear();
+
+ for (const auto& Kv : m_MetaMap)
+ {
+ Hashes.push_back(Kv.second);
+ }
+
+ GcCtx.ContributeCids(Hashes);
+}
+
bool
ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath)
{
@@ -748,9 +802,51 @@ ProjectStore::Project::DeleteOplog(std::string_view OplogId)
}
void
+ProjectStore::Project::DiscoverOplogs()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(WideToUtf8(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::string> Dirs;
+ } Visit;
+
+ Traversal.TraverseFileSystem(m_OplogStoragePath, Visit);
+
+ for (const std::string& Dir : Visit.Dirs)
+ {
+ OpenOplog(Dir);
+ }
+}
+
+void
ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const
{
- // TODO: should iterate over oplogs which are present on disk but not yet loaded
+ // TODO: should also iterate over oplogs which are present on disk but not yet loaded
+
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ for (auto& Kv : m_Oplogs)
+ {
+ Fn(Kv.second);
+ }
+}
+
+void
+ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn)
+{
+ // TODO: should also iterate over oplogs which are present on disk but not yet loaded
RwLock::SharedLockScope _(m_ProjectLock);
@@ -769,13 +865,20 @@ ProjectStore::Project::Flush()
void
ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ IterateOplogs([&](const Oplog& Ops) { Ops.Scrub(Ctx); });
+}
+
+void
+ProjectStore::Project::GatherReferences(GcContext& GcCtx)
+{
+ IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); });
}
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath)
-: m_Log(zen::logging::Get("project"))
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc)
+: GcContributor(Gc)
+, m_Log(zen::logging::Get("project"))
, m_ProjectBasePath(BasePath)
, m_CidStore(Store)
{
@@ -795,6 +898,45 @@ ProjectStore::BasePathForProject(std::string_view ProjectId)
}
void
+ProjectStore::DiscoverProjects()
+{
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& File,
+ [[maybe_unused]] uint64_t FileSize) override
+ {
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override
+ {
+ Dirs.push_back(WideToUtf8(DirectoryName));
+ return false;
+ }
+
+ std::vector<std::string> Dirs;
+ } Visit;
+
+ if (!std::filesystem::exists(m_ProjectBasePath))
+ {
+ return;
+ }
+
+ Traversal.TraverseFileSystem(m_ProjectBasePath, Visit);
+
+ for (const auto& Dir : Visit.Dirs)
+ {
+ Project* Project = OpenProject(Dir);
+
+ if (Project)
+ {
+ Project->DiscoverOplogs();
+ }
+ }
+}
+
+void
ProjectStore::Flush()
{
// TODO
@@ -818,6 +960,19 @@ ProjectStore::Scrub(ScrubContext& Ctx)
}
}
+void
+ProjectStore::GatherReferences(GcContext& GcCtx)
+{
+ DiscoverProjects();
+
+ RwLock::SharedLockScope _(m_ProjectsLock);
+
+ for (auto& Kv : m_Projects)
+ {
+ Kv.second.GatherReferences(GcCtx);
+ }
+}
+
ProjectStore::Project*
ProjectStore::OpenProject(std::string_view ProjectId)
{
@@ -1440,7 +1595,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
}
- ZEN_INFO("op #{} - '{}' {} '{}/{}' ", OpLsn, Core["key"sv].AsString(), NiceBytes(Payload.Size()), ProjectId, OplogId);
+ ZEN_INFO("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString());
HttpReq.WriteResponse(HttpResponseCode::Created);
},
@@ -1678,6 +1833,8 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request)
}
}
+#if ZEN_USE_NAMED_PIPES
+
//////////////////////////////////////////////////////////////////////////
class SecurityAttributes
@@ -1957,6 +2114,25 @@ LocalProjectService::~LocalProjectService()
m_Impl->Stop();
}
+#endif
+
//////////////////////////////////////////////////////////////////////////
+#if ZEN_WITH_TESTS
+
+TEST_CASE("prj.store")
+{
+ using namespace fmt::literals;
+ using namespace std::literals;
+
+ ScopedTemporaryDirectory TempDir;
+}
+
+#endif
+
+void
+prj_forcelink()
+{
+}
+
} // namespace zen
diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h
index c9f49217a..43acdf05f 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore.h
@@ -2,20 +2,24 @@
#pragma once
+#include <zencore/logging.h>
#include <zencore/uid.h>
#include <zencore/xxhash.h>
#include <zenhttp/httpserver.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
+#include <zenstore/gc.h>
-#include <tsl/robin_map.h>
-#include <zencore/logging.h>
#include <filesystem>
#include <map>
#include <optional>
#include <string>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace zen {
class CbPackage;
@@ -46,12 +50,12 @@ static_assert(IsPow2(sizeof(OplogEntry)));
/** Project Store
*/
-class ProjectStore : public RefCounted
+class ProjectStore : public RefCounted, public GcContributor
{
struct OplogStorage;
public:
- ProjectStore(CidStore& Store, std::filesystem::path BasePath);
+ ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc);
~ProjectStore();
struct Project;
@@ -102,7 +106,8 @@ public:
spdlog::logger& Log() { return m_OuterProject->Log(); }
void Flush();
- void Scrub(ScrubContext& Ctx);
+ void Scrub(ScrubContext& Ctx) const;
+ void GatherReferences(GcContext& GcCtx);
std::size_t OplogCount() const { return m_LatestOpMap.size(); }
@@ -121,7 +126,7 @@ public:
std::filesystem::path m_BasePath;
std::filesystem::path m_TempPath;
- RwLock m_OplogLock;
+ mutable RwLock m_OplogLock;
OidMap<IoHash> m_ChunkMap; // output data chunk id -> CAS address
OidMap<IoHash> m_MetaMap; // meta chunk id -> CAS address
OidMap<FileMapEntry> m_FileMap; // file id -> file map entry
@@ -148,6 +153,8 @@ public:
Oplog* OpenOplog(std::string_view OplogId);
void DeleteOplog(std::string_view OplogId);
void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const;
+ void IterateOplogs(std::function<void(Oplog&)>&& Fn);
+ void DiscoverOplogs();
Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath);
~Project();
@@ -158,6 +165,7 @@ public:
void Flush();
void Scrub(ScrubContext& Ctx);
spdlog::logger& Log();
+ void GatherReferences(GcContext& GcCtx);
private:
ProjectStore* m_ProjectStore;
@@ -181,10 +189,13 @@ public:
bool Exists(std::string_view ProjectId);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void DiscoverProjects();
spdlog::logger& Log() { return m_Log; }
const std::filesystem::path& BasePath() const { return m_ProjectBasePath; }
+ virtual void GatherReferences(GcContext& GcCtx) override;
+
private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
@@ -257,4 +268,6 @@ private:
std::unique_ptr<LocalProjectImpl> m_Impl;
};
+void prj_forcelink();
+
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index d549c2fc4..df975df1f 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -10,12 +10,10 @@
#include <zencore/uid.h>
#include <zencore/zencore.h>
-#pragma warning(push)
-#pragma warning(disable : 4127)
+ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
-#pragma warning(pop)
-
#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
#include <chrono>
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index b36d84c22..567f1d40b 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -43,7 +43,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# define BUILD_VERSION ("dev-build")
#endif
-#define ZEN_SCHEMA_VERSION 1
+#define ZEN_SCHEMA_VERSION 2 /* latest change by: stefan boberg */
//////////////////////////////////////////////////////////////////////////
// We don't have any doctest code in this file but this is needed to bring
@@ -214,18 +214,24 @@ public:
m_CasStore->Initialize(Config);
m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid");
+ m_Gc.SetCidStore(m_CidStore.get());
ZEN_INFO("instantiating project service");
- m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects");
+ m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_Gc);
m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore});
+
+#if ZEN_USE_NAMED_PIPES
m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore);
+#endif
ZEN_INFO("instantiating compute services");
+#if ZEN_USE_EXEC
std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
zen::CreateDirectories(SandboxDir);
m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
+#endif
std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply";
zen::CreateDirectories(ApplySandboxDir);
@@ -267,10 +273,12 @@ public:
m_Http->RegisterService(*m_StructuredCacheService);
}
+#if ZEN_USE_EXEC
if (m_HttpLaunchService)
{
m_Http->RegisterService(*m_HttpLaunchService);
}
+#endif
if (m_HttpFunctionService)
{
@@ -432,6 +440,16 @@ public:
NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
}
+ void CollectGarbage()
+ {
+ Stopwatch Timer;
+ ZEN_INFO("Garbage collection STARTING");
+
+ m_Gc.CollectGarbage();
+
+ ZEN_INFO("Garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+
void Flush()
{
if (m_CasStore)
@@ -495,17 +513,15 @@ private:
zen::Ref<zen::HttpServer> m_Http;
zen::HttpStatusService m_StatusService;
zen::HttpStatsService m_StatsService;
- std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
+ zen::CasGc m_Gc;
+ std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_Gc)};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
- zen::CasGc m_Gc{*m_CasStore};
zen::CasScrubber m_Scrubber{*m_CasStore};
zen::HttpTestService m_TestService;
zen::HttpTestingService m_TestingService;
zen::HttpCasService m_CasService{*m_CasStore};
zen::RefPtr<zen::ProjectStore> m_ProjectStore;
- zen::Ref<zen::LocalProjectService> m_LocalProjectService;
- std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
zen::HttpAdminService m_AdminService;
@@ -514,6 +530,14 @@ private:
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
+#if ZEN_USE_EXEC
+ std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
+#endif
+
+#if ZEN_USE_NAMED_PIPES
+ zen::Ref<zen::LocalProjectService> m_LocalProjectService;
+#endif
+
bool m_DebugOptionForcedCrash = false;
};
@@ -580,6 +604,9 @@ ZenServer::InitializeState(ZenServiceConfig& ServiceConfig)
}
}
+ // Release any open handles so we can overwrite the manifest
+ ManifestData = {};
+
// Handle any state wipe
if (WipeState)
@@ -629,7 +656,7 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; };
ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(m_DataRoot / "cache");
+ m_CacheStore = std::make_unique<ZenCacheStore>(m_Gc, m_DataRoot / "cache");
std::unique_ptr<zen::UpstreamCache> UpstreamCache;
if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
@@ -734,12 +761,8 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
}
}
- m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore,
- *m_CasStore,
- *m_CidStore,
- m_StatsService,
- m_StatusService,
- std::move(UpstreamCache)));
+ m_StructuredCacheService.reset(
+ new zen::HttpStructuredCacheService(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, std::move(UpstreamCache)));
}
} // namespace zen
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index 935979cc3..d1719000b 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -105,6 +105,7 @@
</ItemDefinitionGroup>
<ItemGroup>
<ClInclude Include="admin\admin.h" />
+ <ClInclude Include="cache\cachetracking.h" />
<ClInclude Include="cache\structuredcache.h" />
<ClInclude Include="cache\structuredcachestore.h" />
<ClInclude Include="compute\apply.h" />
@@ -132,6 +133,7 @@
</ItemGroup>
<ItemGroup>
<ClCompile Include="admin\admin.cpp" />
+ <ClCompile Include="cache\cachetracking.cpp" />
<ClCompile Include="cache\structuredcache.cpp" />
<ClCompile Include="cache\structuredcachestore.cpp" />
<ClCompile Include="compute\apply.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index a6e57d8b7..9b248a38b 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -44,6 +44,7 @@
<ClInclude Include="upstream\upstreamapply.h">
<Filter>upstream</Filter>
</ClInclude>
+ <ClInclude Include="cache\cachetracking.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -85,6 +86,7 @@
<ClCompile Include="diag\diagsvcs.cpp">
<Filter>diag</Filter>
</ClCompile>
+ <ClCompile Include="cache\cachetracking.cpp" />
</ItemGroup>
<ItemGroup>
<Filter Include="cache">
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index a4bbfa340..86c6eb849 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -5,6 +5,9 @@
#include "compactcas.h"
#include "filecas.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/except.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -14,6 +17,7 @@
#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/gc.h>
#include <gsl/gsl-lite.hpp>
@@ -67,34 +71,6 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba
//////////////////////////////////////////////////////////////////////////
-struct GcContext::GcState
-{
- CasChunkSet m_CasChunks;
- CasChunkSet m_CidChunks;
-};
-
-GcContext::GcContext() : m_State(std::make_unique<GcState>())
-{
-}
-
-GcContext::~GcContext()
-{
-}
-
-void
-GcContext::ContributeCids(std::span<const IoHash> Cids)
-{
- m_State->m_CidChunks.AddChunksToSet(Cids);
-}
-
-void
-GcContext::ContributeCas(std::span<const IoHash> Cas)
-{
- m_State->m_CasChunks.AddChunksToSet(Cas);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
void
ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks)
{
@@ -119,7 +95,7 @@ ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes)
class CasImpl : public CasStore
{
public:
- CasImpl();
+ CasImpl(CasGc& Gc);
virtual ~CasImpl();
virtual void Initialize(const CasStoreConfiguration& InConfig) override;
@@ -128,14 +104,27 @@ public:
virtual void FilterChunks(CasChunkSet& InOutChunks) override;
virtual void Flush() override;
virtual void Scrub(ScrubContext& Ctx) override;
+ virtual void GarbageCollect(GcContext& GcCtx) override;
private:
CasContainerStrategy m_TinyStrategy;
CasContainerStrategy m_SmallStrategy;
FileCasStrategy m_LargeStrategy;
+ CbObject m_ManifestObject;
+
+ enum class StorageScheme
+ {
+ Legacy = 0,
+ WithCbManifest = 1
+ };
+
+ StorageScheme m_StorageScheme = StorageScheme::Legacy;
+
+ bool OpenOrCreateManifest();
+ void UpdateManifest();
};
-CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config)
+CasImpl::CasImpl(CasGc& Gc) : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config, Gc)
{
}
@@ -155,39 +144,100 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
std::filesystem::create_directories(m_Config.RootDirectory);
// Open or create manifest
- //
- // The manifest is not currently fully implemented. The goal is to
- // use it for recovery and configuration
+ const bool IsNewStore = OpenOrCreateManifest();
+
+ // Initialize payload storage
+
+ m_LargeStrategy.Initialize(IsNewStore);
+ m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
+ m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+}
+
+bool
+CasImpl::OpenOrCreateManifest()
+{
bool IsNewStore = false;
- {
- std::filesystem::path ManifestPath = m_Config.RootDirectory;
- ManifestPath /= ".ucas_root";
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
- std::error_code Ec;
- BasicFile Marker;
- Marker.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec);
+ std::error_code Ec;
+ BasicFile ManifestFile;
+ ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec);
- if (Ec)
+ bool ManifestIsOk = false;
+
+ if (Ec)
+ {
+ if (Ec == std::errc::no_such_file_or_directory)
{
IsNewStore = true;
+ }
+ }
+ else
+ {
+ IoBuffer ManifestBuffer = ManifestFile.ReadAll();
+ ManifestFile.Close();
- ExtendableStringBuilder<128> manifest;
- manifest.Append("#CAS_ROOT\n");
- manifest.Append("ID=");
- zen::Oid id = zen::Oid::NewOid();
- id.ToString(manifest);
-
- Marker.Open(ManifestPath.c_str(), /* IsCreate */ true);
- Marker.Write(manifest.c_str(), (DWORD)manifest.Size(), 0);
+ if (ManifestBuffer.Size() > 0 && ManifestBuffer.Data<uint8_t>()[0] == '#')
+ {
+ // Old-style manifest, does not contain any useful information, so we may as well update it
+ }
+ else
+ {
+ CbObject Manifest{SharedBuffer(ManifestBuffer)};
+ CbValidateError ValidationResult = ValidateCompactBinary(ManifestBuffer, CbValidateMode::All);
+
+ if (ValidationResult == CbValidateError::None)
+ {
+ if (Manifest["id"])
+ {
+ ManifestIsOk = true;
+ }
+ }
+ else
+ {
+ ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", ValidationResult);
+ }
+
+ if (ManifestIsOk)
+ {
+ m_ManifestObject = std::move(Manifest);
+ }
}
}
- // Initialize payload storage
+ if (!ManifestIsOk)
+ {
+ UpdateManifest();
+ }
- m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
- m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+ return IsNewStore;
+}
+
+void
+CasImpl::UpdateManifest()
+{
+ if (!m_ManifestObject)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << zen::Oid::NewOid() << "created" << DateTime::Now();
+ m_ManifestObject = Cbo.Save();
+ }
+
+ // Write manifest to file
+
+ std::filesystem::path ManifestPath = m_Config.RootDirectory;
+ ManifestPath /= ".ucas_root";
+
+ // This will throw on failure
+
+ ZEN_TRACE("Writing new manifest to '{}'", ManifestPath);
+
+ BasicFile Marker;
+ Marker.Open(ManifestPath.c_str(), /* IsCreate */ true);
+ Marker.Write(m_ManifestObject.GetBuffer(), 0);
}
CasStore::InsertResult
@@ -262,12 +312,18 @@ CasImpl::Scrub(ScrubContext& Ctx)
m_LargeStrategy.Scrub(Ctx);
}
+void
+CasImpl::GarbageCollect(GcContext& GcCtx)
+{
+ m_LargeStrategy.CollectGarbage(GcCtx);
+}
+
//////////////////////////////////////////////////////////////////////////
CasStore*
-CreateCasStore()
+CreateCasStore(CasGc& Gc)
{
- return new CasImpl();
+ return new CasImpl(Gc);
}
//////////////////////////////////////////////////////////////////////////
@@ -284,7 +340,9 @@ TEST_CASE("CasStore")
CasStoreConfiguration config;
config.RootDirectory = TempDir.Path();
- std::unique_ptr<CasStore> Store{CreateCasStore()};
+ CasGc Gc;
+
+ std::unique_ptr<CasStore> Store{CreateCasStore(Gc)};
Store->Initialize(config);
ScrubContext Ctx;
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index 2bac6affd..369bc55ad 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -46,7 +46,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
m_RecordSize = RecordSize;
std::error_code Ec;
- m_File.Open(FileName, IsCreate);
+ m_File.Open(FileName, IsCreate, Ec);
if (Ec)
{
@@ -55,7 +55,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
uint64_t AppendOffset = 0;
- if (IsCreate)
+ if (IsCreate || (m_File.FileSize() < sizeof(FileHeader)))
{
// Initialize log by writing header
FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0};
@@ -76,12 +76,18 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
{
- // TODO: provide more context!
- throw std::runtime_error("Mangled log header");
+ throw std::runtime_error("Mangled log header (invalid header magic) in '{}'"_format(FileName));
}
AppendOffset = m_File.FileSize();
- m_Header = Header;
+
+ // Adjust the offset to ensure we end up on a good boundary, in case there is some garbage appended
+
+ AppendOffset -= sizeof Header;
+ AppendOffset -= AppendOffset % RecordSize;
+ AppendOffset += sizeof Header;
+
+ m_Header = Header;
}
m_AppendOffset = AppendOffset;
@@ -96,6 +102,12 @@ CasLogFile::Close()
m_File.Close();
}
+uint64_t
+CasLogFile::GetLogSize()
+{
+ return m_File.FileSize();
+}
+
void
CasLogFile::Replay(std::function<void(const void*)>&& Handler)
{
@@ -125,6 +137,8 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler)
{
Handler(ReadBuffer.data() + (i * m_RecordSize));
}
+
+ m_AppendOffset = LogBaseOffset + (m_RecordSize * LogEntryCount);
}
void
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 7a5d7bcf4..c91f69ff7 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -4,7 +4,9 @@
#include <zencore/compress.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/string.h>
#include <zenstore/CAS.h>
#include <zenstore/caslog.h>
@@ -65,9 +67,21 @@ struct CidStore::Impl
LogMapping(DecompressedId, Compressed);
}
- void LogMapping(const IoHash& DecompressedId, const IoHash& Compressed)
+ void LogMapping(const IoHash& DecompressedId, const IoHash& CompressedHash)
{
- m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = Compressed});
+ ZEN_ASSERT(DecompressedId != CompressedHash);
+ m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = CompressedHash});
+ }
+
+ IoHash RemapCid(const IoHash& DecompressedId)
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
+ {
+ return It->second;
+ }
+
+ return IoHash::Zero;
}
IoBuffer FindChunkByCid(const IoHash& DecompressedId)
@@ -118,23 +132,35 @@ struct CidStore::Impl
m_LogFile.Open(SlogPath, IsNew);
+ ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize()));
+
uint64_t TombstoneCount = 0;
+ uint64_t InvalidCount = 0;
- m_LogFile.Replay([&](const IndexEntry& Ie) {
- if (Ie.Compressed != IoHash::Zero)
+ m_LogFile.Replay([&](const IndexEntry& Entry) {
+ if (Entry.Compressed != IoHash::Zero)
{
// Update
- m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed);
+ m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
}
else
{
- // Tombstone
- m_CidMap.erase(Ie.Uncompressed);
- ++TombstoneCount;
+ if (Entry.Uncompressed != IoHash::Zero)
+ {
+ // Tombstone
+ m_CidMap.erase(Entry.Uncompressed);
+ ++TombstoneCount;
+ }
+ else
+ {
+ // Completely uninitialized entry with both hashes set to zero indicates a
+ // problem. Might be an unwritten page due to BSOD or some other problem
+ ++InvalidCount;
+ }
}
});
- ZEN_INFO("CID index initialized: {} entries found ({} tombstones)", m_CidMap.size(), TombstoneCount);
+ ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount);
}
void Flush() { m_LogFile.Flush(); }
@@ -239,6 +265,12 @@ CidStore::FindChunkByCid(const IoHash& DecompressedId)
return m_Impl->FindChunkByCid(DecompressedId);
}
+IoHash
+CidStore::RemapCid(const IoHash& DecompressedId)
+{
+ return m_Impl->RemapCid(DecompressedId);
+}
+
bool
CidStore::ContainsChunk(const IoHash& DecompressedId)
{
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 612f87c7c..dbe5572b9 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -4,13 +4,19 @@
#include "CompactCas.h"
+#include <zencore/compactbinarybuilder.h>
#include <zencore/except.h>
+#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/gc.h>
+
#include <filesystem>
#include <functional>
#include <gsl/gsl-lite.hpp>
@@ -58,7 +64,7 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6
m_CasLog.Replay([&](const CasDiskIndexEntry& Record) {
m_LocationMap[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset + Record.Location.Size);
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize());
});
}
@@ -91,7 +97,7 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
RwLock::ExclusiveLockScope __(m_LocationMapLock);
- CasDiskLocation Location{.Offset = InsertOffset, .Size = /* TODO FIX */ uint32_t(ChunkSize)};
+ const CasDiskLocation Location{InsertOffset, ChunkSize};
m_LocationMap[ChunkHash] = Location;
@@ -116,7 +122,8 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
{
const CasDiskLocation& Location = KeyIt->second;
- return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size);
+
+ return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize());
}
// Not found
@@ -187,11 +194,11 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (auto& Entry : m_LocationMap)
{
- const uint64_t EntryOffset = Entry.second.Offset;
+ const uint64_t EntryOffset = Entry.second.GetOffset();
if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
- const uint64_t EntryEnd = EntryOffset + Entry.second.Size;
+ const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize();
if (EntryEnd >= WindowEnd)
{
@@ -201,7 +208,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size);
+ IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart,
+ Entry.second.GetSize());
if (Entry.first != ComputedHash)
{
@@ -222,7 +230,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (const CasDiskIndexEntry& Entry : BigChunks)
{
IoHashStream Hasher;
- m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) {
+ m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
Hasher.Append(Data, Size);
});
IoHash ComputedHash = Hasher.GetHash();
@@ -258,6 +266,12 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
void
+CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_UNUSED(GcCtx);
+}
+
+void
CasContainerStrategy::MakeSnapshot()
{
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -275,4 +289,81 @@ CasContainerStrategy::MakeSnapshot()
m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0);
}
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("cas.compact.gc")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const int kIterationCount = 1000;
+
+ std::vector<IoHash> Keys(kIterationCount);
+
+ {
+ CasContainerStrategy Cas(CasConfig);
+ Cas.Initialize("test", 16, true);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ const IoHash Hash = HashBuffer(ObjBuffer);
+
+ Cas.InsertChunk(ObjBuffer, Hash);
+
+ Keys[i] = Hash;
+ }
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+ }
+
+ // Validate that we can still read the inserted data after closing
+ // the original cas store
+
+ {
+ CasContainerStrategy Cas(CasConfig);
+ Cas.Initialize("test", 16, false);
+
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ IoBuffer Chunk = Cas.FindChunk(Keys[i]);
+
+ CHECK(!!Chunk);
+
+ CbObject Value = LoadCompactBinaryObject(Chunk);
+
+ CHECK_EQ(Value["id"].AsInt32(), i);
+ }
+
+ GcContext Ctx;
+ Cas.CollectGarbage(Ctx);
+ }
+}
+
+#endif
+
+void
+compactcas_forcelink()
+{
+}
+
} // namespace zen
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index a512c3d93..a3f3121e6 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -23,17 +23,42 @@ namespace zen {
struct CasDiskLocation
{
- uint64_t Offset;
- // If we wanted to be able to store larger chunks using this storage mechanism then
- // we could make this more like the IoStore index so we can store larger chunks.
- // I.e use five bytes for size and seven for offset
- uint32_t Size;
+ CasDiskLocation(uint64_t InOffset, uint64_t InSize)
+ {
+ ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
+ ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
+
+ memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
+ memcpy(&m_Size[0], &InSize, sizeof m_Size);
+ }
+
+ CasDiskLocation() = default;
+
+ inline uint64_t GetOffset() const
+ {
+ uint64_t Offset = 0;
+ memcpy(&Offset, &m_Offset, sizeof m_Offset);
+ return Offset;
+ }
+
+ inline uint64_t GetSize() const
+ {
+ uint64_t Size = 0;
+ memcpy(&Size, &m_Size, sizeof m_Size);
+ return Size;
+ }
+
+private:
+ uint8_t m_Offset[5];
+ uint8_t m_Size[5];
};
struct CasDiskIndexEntry
{
IoHash Key;
CasDiskLocation Location;
+ ZenContentType ContentType = ZenContentType::kUnknownContentType;
+ uint8_t Flags = 0;
};
#pragma pack(pop)
@@ -61,6 +86,7 @@ struct CasContainerStrategy
void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore);
void Flush();
void Scrub(ScrubContext& Ctx);
+ void CollectGarbage(GcContext& GcCtx);
private:
const CasStoreConfiguration& m_Config;
@@ -80,4 +106,6 @@ private:
void MakeSnapshot();
};
+void compactcas_forcelink();
+
} // namespace zen
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index a37450cd8..0efdc96f5 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -14,6 +14,11 @@
#include <zencore/thread.h>
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
+#include <zenstore/gc.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+#endif
#include <gsl/gsl-lite.hpp>
@@ -65,7 +70,10 @@ FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& Roo
//////////////////////////////////////////////////////////////////////////
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas"))
+FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
+: GcStorage(Gc)
+, m_Config(Config)
+, m_Log(logging::Get("filecas"))
{
}
@@ -73,9 +81,23 @@ FileCasStrategy::~FileCasStrategy()
{
}
+void
+FileCasStrategy::Initialize(bool IsNewStore)
+{
+ m_IsInitialized = true;
+
+ CreateDirectories(m_Config.RootDirectory);
+
+ m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore);
+
+ m_CasLog.Replay([&](const FileCasIndexEntry& Entry) { ZEN_UNUSED(Entry); });
+}
+
CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
// File-based chunks have special case handling whereby we move the file into
// place in the file store directory, thus avoiding unnecessary copying
@@ -207,6 +229,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
if (Success)
{
+ m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+
return CasStore::InsertResult{.New = true};
}
@@ -232,6 +256,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
CasStore::InsertResult
FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
// See if file already exists
@@ -304,12 +330,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
// *after* the lock is released due to the initialization order
PayloadFile.Close();
+ m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
+
return {.New = true};
}
IoBuffer
FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -320,6 +350,8 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
+ ZEN_ASSERT(m_IsInitialized);
+
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
@@ -332,6 +364,7 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
return false;
}
+
void
FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
{
@@ -340,11 +373,18 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath));
std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+
+ if (!Ec)
+ {
+ m_CasLog.Append({.Key = ChunkHash, .Size = ~(0ull)});
+ }
}
void
FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
{
+ ZEN_ASSERT(m_IsInitialized);
+
// NOTE: it's not a problem now, but in the future if a GC should happen while this
// is in flight, the result could be wrong since chunks could go away in the meantime.
//
@@ -359,6 +399,8 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback)
{
+ ZEN_ASSERT(m_IsInitialized);
+
struct Visitor : public FileSystemTraversal::TreeVisitor
{
Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {}
@@ -430,6 +472,8 @@ FileCasStrategy::Flush()
void
FileCasStrategy::Scrub(ScrubContext& Ctx)
{
+ ZEN_ASSERT(m_IsInitialized);
+
std::vector<IoHash> BadHashes;
std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
@@ -476,9 +520,69 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
void
-FileCasStrategy::GarbageCollect(GcContext& GcCtx)
+FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
- ZEN_UNUSED(GcCtx);
+ ZEN_ASSERT(m_IsInitialized);
+
+ ZEN_INFO("collecting garbage from {}", m_Config.RootDirectory);
+
+ std::vector<IoHash> ChunksToDelete;
+ std::atomic<uint64_t> ChunksToDeleteBytes{0};
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+
+ std::vector<IoHash> CandidateCas;
+
+ IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
+ bool KeepThis = false;
+ CandidateCas.clear();
+ CandidateCas.push_back(Hash);
+ GcCtx.FilterCas(CandidateCas, [&](const IoHash& Hash) {
+ ZEN_UNUSED(Hash);
+ KeepThis = true;
+ });
+
+ const uint64_t FileSize = Payload.FileSize();
+
+ if (!KeepThis)
+ {
+ ChunksToDelete.push_back(Hash);
+ ChunksToDeleteBytes.fetch_add(FileSize);
+ }
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(FileSize);
+ });
+
+ ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
+
+ if (ChunksToDelete.empty())
+ {
+ ZEN_INFO("nothing to delete");
+
+ return;
+ }
+
+ ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunksToDelete.size(), NiceBytes(ChunksToDeleteBytes));
+
+ if (GcCtx.IsDeletionMode() == false)
+ {
+ ZEN_INFO("NOTE: not actually deleting anything since deletion is disabled");
+
+ return;
+ }
+
+ for (const IoHash& Hash : ChunksToDelete)
+ {
+ ZEN_TRACE("deleting chunk {}", Hash);
+
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message());
+ }
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -489,12 +593,16 @@ TEST_CASE("cas.file.move")
{
using namespace fmt::literals;
- ScopedTemporaryDirectory TempDir{"d:\\filecas_testdir"};
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ CasGc Gc;
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path() / "cas";
- FileCasStrategy FileCas(CasConfig);
+ FileCasStrategy FileCas(CasConfig, Gc);
+ FileCas.Initialize(/* IsNewStore */ true);
{
std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
@@ -564,6 +672,84 @@ TEST_CASE("cas.file.move")
# endif
}
+TEST_CASE("cas.file.gc")
+{
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ CasGc Gc;
+ FileCasStrategy FileCas(CasConfig, Gc);
+ FileCas.Initialize(/* IsNewStore */ true);
+
+ const int kIterationCount = 1000;
+ std::vector<IoHash> Keys{kIterationCount};
+
+ auto InsertChunks = [&] {
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ IoHash Hash = HashBuffer(ObjBuffer);
+
+ FileCas.InsertChunk(ObjBuffer, Hash);
+
+ Keys[i] = Hash;
+ }
+ };
+
+ // Drop everything
+
+ {
+ InsertChunks();
+
+ GcContext Ctx;
+ FileCas.CollectGarbage(Ctx);
+
+ for (const IoHash& Key : Keys)
+ {
+ IoBuffer Chunk = FileCas.FindChunk(Key);
+
+ CHECK(!Chunk);
+ }
+ }
+
+ // Keep roughly half of the chunks
+
+ {
+ InsertChunks();
+
+ GcContext Ctx;
+
+ for (const IoHash& Key : Keys)
+ {
+ if (Key.Hash[0] & 1)
+ {
+ Ctx.ContributeCas(std::vector<IoHash>{Key});
+ }
+ }
+
+ FileCas.CollectGarbage(Ctx);
+
+ for (const IoHash& Key : Keys)
+ {
+ if (Key.Hash[0] & 1)
+ {
+ CHECK(FileCas.FindChunk(Key));
+ }
+ else
+ {
+ CHECK(!FileCas.FindChunk(Key));
+ }
+ }
+ }
+}
+
#endif
void
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index 14314ce52..ec2ca3f31 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -9,6 +9,8 @@
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
+#include <zenstore/gc.h>
#include <functional>
@@ -23,18 +25,19 @@ class BasicFile;
/** CAS storage strategy using a file-per-chunk storage strategy
*/
-struct FileCasStrategy
+struct FileCasStrategy : public GcStorage
{
- FileCasStrategy(const CasStoreConfiguration& Config);
+ FileCasStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
~FileCasStrategy();
+ void Initialize(bool IsNewStore);
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(CasChunkSet& InOutChunks);
void Flush();
- void GarbageCollect(GcContext& GcCtx);
+ virtual void CollectGarbage(GcContext& GcCtx) override;
void Scrub(ScrubContext& Ctx);
private:
@@ -43,6 +46,18 @@ private:
RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
spdlog::logger& m_Log;
spdlog::logger& Log() { return m_Log; }
+ bool m_IsInitialized = false;
+
+ struct FileCasIndexEntry
+ {
+ IoHash Key;
+ uint32_t Pad = 0;
+ uint64_t Size = 0;
+ };
+
+ static_assert(sizeof(FileCasIndexEntry) == 32);
+
+ TCasLogFile<FileCasIndexEntry> m_CasLog;
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback);
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index bfb8f015e..278f09b0b 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -2,9 +2,97 @@
#include <zenstore/gc.h>
+#include <zenstore/CAS.h>
+#include <zenstore/cidstore.h>
+#include <zencore/logging.h>
+
namespace zen {
-CasGc::CasGc(CasStore& Store) : m_CasStore(Store)
+//////////////////////////////////////////////////////////////////////////
+
+struct GcContext::GcState
+{
+ CasChunkSet m_CasChunks;
+ CasChunkSet m_CidChunks;
+ bool m_DeletionMode = true;
+};
+
+GcContext::GcContext() : m_State(std::make_unique<GcState>())
+{
+}
+
+GcContext::~GcContext()
+{
+}
+
+void
+GcContext::ContributeCids(std::span<const IoHash> Cids)
+{
+ m_State->m_CidChunks.AddChunksToSet(Cids);
+}
+
+void
+GcContext::ContributeCas(std::span<const IoHash> Cas)
+{
+ m_State->m_CasChunks.AddChunksToSet(Cas);
+}
+
+void
+GcContext::IterateCids(std::function<void(const IoHash&)> Callback)
+{
+ m_State->m_CidChunks.IterateChunks([&](const IoHash& Hash) { Callback(Hash); });
+}
+
+void
+GcContext::FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc)
+{
+ m_State->m_CidChunks.FilterChunks(Cid, [&](const IoHash& Hash) { KeepFunc(Hash); });
+}
+
+void
+GcContext::FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc)
+{
+ m_State->m_CasChunks.FilterChunks(Cas, [&](const IoHash& Hash) { KeepFunc(Hash); });
+}
+
+bool
+GcContext::IsDeletionMode() const
+{
+ return m_State->m_DeletionMode;
+}
+void
+GcContext::SetDeletionMode(bool NewState)
+{
+ m_State->m_DeletionMode = NewState;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc)
+{
+ m_Gc.AddGcContributor(this);
+}
+
+GcContributor::~GcContributor()
+{
+ m_Gc.RemoveGcContributor(this);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc)
+{
+ m_Gc.AddGcStorage(this);
+}
+
+GcStorage::~GcStorage()
+{
+ m_Gc.AddGcStorage(this);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+CasGc::CasGc()
{
}
@@ -13,12 +101,103 @@ CasGc::~CasGc()
}
void
+CasGc::AddGcContributor(GcContributor* Contributor)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcContribs.push_back(Contributor);
+}
+
+void
+CasGc::RemoveGcContributor(GcContributor* Contributor)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcContribs, [&](GcContributor* $) { return $ == Contributor; });
+}
+
+void
+CasGc::AddGcStorage(GcStorage* Storage)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcStorage.push_back(Storage);
+}
+
+void
+CasGc::RemoveGcStorage(GcStorage* Storage)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcStorage, [&](GcStorage* $) { return $ == Storage; });
+}
+
+void
CasGc::CollectGarbage()
{
+ RwLock::SharedLockScope _(m_Lock);
+
+ // First gather reference set
+
+ GcContext GcCtx;
+ GcCtx.SetDeletionMode(false);
+
+ for (GcContributor* Contributor : m_GcContribs)
+ {
+ Contributor->GatherReferences(GcCtx);
+ }
+
+ if (CidStore* CidStore = m_CidStore)
+ {
+ std::vector<IoHash> CasHashes;
+
+ int UnknownChunks = 0;
+
+ GcCtx.IterateCids([&](const IoHash& Hash) {
+ IoHash Cas = CidStore->RemapCid(Hash);
+
+ if (Cas == IoHash::Zero)
+ {
+ ++UnknownChunks;
+ }
+ else
+ {
+ CasHashes.push_back(Cas);
+ }
+ });
+
+ if (UnknownChunks)
+ {
+ ZEN_WARN("found {} unknown CIDs", UnknownChunks);
+ }
+
+ GcCtx.ContributeCas(CasHashes);
+ }
+
+ // Then trim storage
+
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ Storage->CollectGarbage(GcCtx);
+ }
+}
+
+void
+CasGc::SetCidStore(CidStore* Cids)
+{
+ m_CidStore = Cids;
+}
+
+void
+CasGc::OnNewCidReferences(std::span<IoHash> Hashes)
+{
+ ZEN_UNUSED(Hashes);
+}
+
+void
+CasGc::OnCommittedCidReferences(std::span<IoHash> Hashes)
+{
+ ZEN_UNUSED(Hashes);
}
void
-CasGc::OnNewReferences(std::span<IoHash> Hashes)
+CasGc::OnDroppedCidReferences(std::span<IoHash> Hashes)
{
ZEN_UNUSED(Hashes);
}
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index 86e7e78d9..5b508baa0 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -11,6 +11,7 @@
#include <zencore/timer.h>
#include <atomic>
+#include <concepts>
#include <filesystem>
#include <functional>
#include <memory>
@@ -19,6 +20,9 @@
namespace zen {
+class GcContext;
+class CasGc;
+
struct CasStoreConfiguration
{
// Root directory for CAS store
@@ -45,29 +49,22 @@ public:
inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
+ inline void FilterChunks(std::span<const IoHash> Candidates, std::invocable<const IoHash&> auto MatchFunc)
+ {
+ for (const IoHash& Candidate : Candidates)
+ {
+ if (ContainsChunk(Candidate))
+ {
+ MatchFunc(Candidate);
+ }
+ }
+ }
+
private:
// Q: should we protect this with a lock, or is that a higher level concern?
std::unordered_set<IoHash> m_ChunkSet;
};
-/** Garbage Collection context object
- */
-
-class GcContext
-{
-public:
- GcContext();
- ~GcContext();
-
- void ContributeCids(std::span<const IoHash> Cid);
- void ContributeCas(std::span<const IoHash> Hash);
-
-private:
- struct GcState;
-
- std::unique_ptr<GcState> m_State;
-};
-
/** Context object for data scrubbing
*
* Data scrubbing is when we traverse stored data to validate it and
@@ -116,13 +113,14 @@ public:
virtual void FilterChunks(CasChunkSet& InOutChunks) = 0;
virtual void Flush() = 0;
virtual void Scrub(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
protected:
CasStoreConfiguration m_Config;
uint64_t m_LastScrubTime = 0;
};
-ZENCORE_API CasStore* CreateCasStore();
+ZENCORE_API CasStore* CreateCasStore(CasGc& Gc);
void CAS_forcelink();
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index 00b987383..bb9d07726 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -27,6 +27,7 @@ public:
void Replay(std::function<void(const void*)>&& Handler);
void Flush();
void Close();
+ uint64_t GetLogSize();
private:
struct FileHeader
@@ -57,6 +58,8 @@ template<typename T>
class TCasLogFile : public CasLogFile
{
public:
+ void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); }
+
// This should be called before the Replay() is called to do some basic sanity checking
bool Initialize() { return true; }
@@ -76,7 +79,6 @@ public:
CasLogFile::Append(&Record, sizeof Record);
}
- void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); }
};
} // namespace zen
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index 5f567e7fc..a69569bd2 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -4,10 +4,13 @@
#include "zenstore.h"
-#include <tsl/robin_map.h>
#include <zencore/iohash.h>
#include <zenstore/CAS.h>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace std::filesystem {
class path;
}
@@ -54,6 +57,8 @@ public:
// TODO: add batch filter support
+ IoHash RemapCid(const IoHash& DecompressedId);
+
private:
struct Impl;
std::unique_ptr<Impl> m_Impl;
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index 055843547..560642803 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -3,26 +3,108 @@
#pragma once
#include <zencore/iohash.h>
+#include <zencore/thread.h>
#include <span>
+#include <functional>
+
+#define ZEN_USE_REF_TRACKING 0 // This is not currently functional
namespace zen {
class CasStore;
+class CasGc;
+class CidStore;
struct IoHash;
+/** Garbage Collection context object
+ */
+
+class GcContext
+{
+public:
+ GcContext();
+ ~GcContext();
+
+ void ContributeCids(std::span<const IoHash> Cid);
+ void ContributeCas(std::span<const IoHash> Hash);
+
+ void IterateCids(std::function<void(const IoHash&)> Callback);
+
+ void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
+ void FilterCas(std::span<const IoHash> Cas, std::function<void(const IoHash&)> KeepFunc);
+
+ bool IsDeletionMode() const;
+ void SetDeletionMode(bool NewState);
+
+private:
+ struct GcState;
+
+ std::unique_ptr<GcState> m_State;
+};
+
+/** GC root contributor
+
+ Higher level data structures provide roots for the garbage collector,
+ which ultimately determine what is garbage and what data we need to
+ retain.
+
+ */
+
+class GcContributor
+{
+public:
+ GcContributor(CasGc& Gc);
+ ~GcContributor();
+
+ virtual void GatherReferences(GcContext& GcCtx) = 0;
+
+protected:
+ CasGc& m_Gc;
+};
+
+/** GC storage provider
+ */
+
+class GcStorage
+{
+public:
+ GcStorage(CasGc& Gc);
+ ~GcStorage();
+
+ virtual void CollectGarbage(GcContext& GcCtx) = 0;
+
+private:
+ CasGc& m_Gc;
+};
+
+/** GC orchestrator
+ */
+
class CasGc
{
public:
- CasGc(CasStore& Store);
+ CasGc();
~CasGc();
+ void AddGcContributor(GcContributor* Contributor);
+ void RemoveGcContributor(GcContributor* Contributor);
+
+ void AddGcStorage(GcStorage* Contributor);
+ void RemoveGcStorage(GcStorage* Contributor);
+
void CollectGarbage();
- void OnNewReferences(std::span<IoHash> Hashes);
+ void SetCidStore(CidStore* Cids);
+ void OnNewCidReferences(std::span<IoHash> Hashes);
+ void OnCommittedCidReferences(std::span<IoHash> Hashes);
+ void OnDroppedCidReferences(std::span<IoHash> Hashes);
private:
- CasStore& m_CasStore;
+ RwLock m_Lock;
+ std::vector<GcContributor*> m_GcContribs;
+ std::vector<GcStorage*> m_GcStorage;
+ CidStore* m_CidStore;
};
} // namespace zen
diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp
index d852fa64b..9fdf2dccf 100644
--- a/zenstore/zenstore.cpp
+++ b/zenstore/zenstore.cpp
@@ -4,6 +4,7 @@
#include <zenstore/CAS.h>
#include <zenstore/basicfile.h>
+#include "compactcas.h"
#include "filecas.h"
namespace zen {
@@ -14,6 +15,7 @@ zenstore_forcelinktests()
basicfile_forcelink();
CAS_forcelink();
filecas_forcelink();
+ compactcas_forcelink();
}
} // namespace zen
diff --git a/zenstore/zenstore.vcxproj b/zenstore/zenstore.vcxproj
index eb2ecd02b..832ea8159 100644
--- a/zenstore/zenstore.vcxproj
+++ b/zenstore/zenstore.vcxproj
@@ -97,7 +97,6 @@
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
@@ -111,7 +110,6 @@
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
diff --git a/zenutil/zenutil.vcxproj b/zenutil/zenutil.vcxproj
index f5db7c5b0..e0c034c6f 100644
--- a/zenutil/zenutil.vcxproj
+++ b/zenutil/zenutil.vcxproj
@@ -68,7 +68,6 @@
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
@@ -81,7 +80,6 @@
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
- <WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>