aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-04-07 18:22:26 +0200
committerGitHub <[email protected]>2022-04-07 18:22:26 +0200
commit487352bb3e1de5a96268616bb335f9ef857cd629 (patch)
tree4b03eb73f02bf03d1b4671775c1c5277a7d7341a
parentAdd pre-commit config (#69) (diff)
parentclean up variable naming (diff)
downloadzen-487352bb3e1de5a96268616bb335f9ef857cd629.tar.xz
zen-487352bb3e1de5a96268616bb335f9ef857cd629.zip
Merge pull request #58 from EpicGames/de/cas-store-with-block-store
de/cas store with block store
-rw-r--r--zencore/filesystem.cpp4
-rw-r--r--zencore/include/zencore/string.h105
-rw-r--r--zencore/iobuffer.cpp5
-rw-r--r--zenserver/cache/structuredcachestore.cpp60
-rw-r--r--zenserver/config.cpp8
-rw-r--r--zenserver/config.h1
-rw-r--r--zenserver/projectstore.cpp58
-rw-r--r--zenserver/zenserver.cpp1
-rw-r--r--zenstore/basicfile.cpp177
-rw-r--r--zenstore/blockstore.cpp242
-rw-r--r--zenstore/cas.cpp8
-rw-r--r--zenstore/caslog.cpp46
-rw-r--r--zenstore/cidstore.cpp38
-rw-r--r--zenstore/compactcas.cpp2443
-rw-r--r--zenstore/compactcas.h90
-rw-r--r--zenstore/filecas.cpp73
-rw-r--r--zenstore/gc.cpp419
-rw-r--r--zenstore/include/zenstore/basicfile.h17
-rw-r--r--zenstore/include/zenstore/blockstore.h104
-rw-r--r--zenstore/include/zenstore/cas.h2
-rw-r--r--zenstore/include/zenstore/caslog.h40
-rw-r--r--zenstore/include/zenstore/gc.h4
-rw-r--r--zenstore/zenstore.cpp2
23 files changed, 3425 insertions, 522 deletions
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index 041abaf1d..e2778089b 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -921,6 +921,10 @@ PathFromHandle(void* NativeHandle)
}
const DWORD RequiredLengthIncludingNul = GetFinalPathNameByHandleW(NativeHandle, nullptr, 0, FILE_NAME_OPENED);
+ if (RequiredLengthIncludingNul == 0)
+ {
+ ThrowLastError(fmt::format("failed to get path from file handle {}", NativeHandle));
+ }
std::wstring FullPath;
FullPath.resize(RequiredLengthIncludingNul - 1);
diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h
index 4c378730f..e502120ac 100644
--- a/zencore/include/zencore/string.h
+++ b/zencore/include/zencore/string.h
@@ -9,6 +9,7 @@
#include <string.h>
#include <charconv>
#include <codecvt>
+#include <concepts>
#include <optional>
#include <span>
#include <string_view>
@@ -488,6 +489,26 @@ std::string WideToUtf8(const wchar_t* wstr);
void WideToUtf8(const std::wstring_view& wstr, StringBuilderBase& out);
std::string WideToUtf8(const std::wstring_view Wstr);
+inline uint8_t
+Char2Nibble(char c)
+{
+ if (c >= '0' && c <= '9')
+ {
+ return uint8_t(c - '0');
+ }
+ if (c >= 'a' && c <= 'f')
+ {
+ return uint8_t(c - 'a' + 10);
+ }
+ if (c >= 'A' && c <= 'F')
+ {
+ return uint8_t(c - 'A' + 10);
+ }
+ return uint8_t(0xff);
+};
+
+static constexpr const char HexChars[] = "0123456789abcdef";
+
/// <summary>
/// Parse hex string into a byte buffer
/// </summary>
@@ -501,38 +522,56 @@ ParseHexBytes(const char* InputString, size_t CharacterCount, uint8_t* OutPtr)
{
ZEN_ASSERT((CharacterCount & 1) == 0);
- auto char2nibble = [](char c) {
- uint8_t c8 = uint8_t(c - '0');
+ uint8_t allBits = 0;
- if (c8 < 10)
- return c8;
+ while (CharacterCount)
+ {
+ uint8_t n0 = Char2Nibble(InputString[0]);
+ uint8_t n1 = Char2Nibble(InputString[1]);
- c8 -= 'A' - '0' - 10;
+ allBits |= n0 | n1;
- if (c8 < 16)
- return c8;
+ *OutPtr = (n0 << 4) | n1;
- c8 -= 'a' - 'A';
+ OutPtr += 1;
+ InputString += 2;
+ CharacterCount -= 2;
+ }
- if (c8 < 16)
- return c8;
+ return (allBits & 0x80) == 0;
+}
- return uint8_t(0xff);
- };
+inline void
+ToHexBytes(const uint8_t* InputData, size_t ByteCount, char* OutString)
+{
+ while (ByteCount--)
+ {
+ uint8_t byte = *InputData++;
+
+ *OutString++ = HexChars[byte >> 4];
+ *OutString++ = HexChars[byte & 15];
+ }
+}
+
+inline bool
+ParseHexNumber(const char* InputString, size_t CharacterCount, uint8_t* OutPtr)
+{
+ ZEN_ASSERT((CharacterCount & 1) == 0);
uint8_t allBits = 0;
+ InputString += CharacterCount;
while (CharacterCount)
{
- uint8_t n0 = char2nibble(InputString[0]);
- uint8_t n1 = char2nibble(InputString[1]);
+ InputString -= 2;
+ uint8_t n0 = Char2Nibble(InputString[0]);
+ uint8_t n1 = Char2Nibble(InputString[1]);
allBits |= n0 | n1;
*OutPtr = (n0 << 4) | n1;
OutPtr += 1;
- InputString += 2;
CharacterCount -= 2;
}
@@ -540,19 +579,43 @@ ParseHexBytes(const char* InputString, size_t CharacterCount, uint8_t* OutPtr)
}
inline void
-ToHexBytes(const uint8_t* InputData, size_t ByteCount, char* OutString)
+ToHexNumber(const uint8_t* InputData, size_t ByteCount, char* OutString)
{
- const char hexchars[] = "0123456789abcdef";
-
+ InputData += ByteCount;
while (ByteCount--)
{
- uint8_t byte = *InputData++;
+ uint8_t byte = *(--InputData);
- *OutString++ = hexchars[byte >> 4];
- *OutString++ = hexchars[byte & 15];
+ *OutString++ = HexChars[byte >> 4];
+ *OutString++ = HexChars[byte & 15];
}
}
+/// <summary>
+/// Generates a hex number from a pointer to an integer type, this formats the number in the correct order for a hexadecimal number
+/// </summary>
+/// <param name="Value">Integer value type</param>
+/// <param name="outString">Output buffer where resulting string is written</param>
+void
+ToHexNumber(std::unsigned_integral auto Value, char* OutString)
+{
+ ToHexNumber((const uint8_t*)&Value, sizeof(Value), OutString);
+ OutString[sizeof(Value) * 2] = 0;
+}
+
+/// <summary>
+/// Parse hex number string into a value, this formats the number in the correct order for a hexadecimal number
+/// </summary>
+/// <param name="string">Input string</param>
+/// <param name="characterCount">Number of characters in string</param>
+/// <param name="OutValue">Pointer to output value</param>
+/// <returns>true if the input consisted of all valid hexadecimal characters</returns>
+bool
+ParseHexNumber(const std::string HexString, std::unsigned_integral auto& OutValue)
+{
+ return ParseHexNumber(HexString.c_str(), sizeof(OutValue) * 2, (uint8_t*)&OutValue);
+}
+
//////////////////////////////////////////////////////////////////////////
// Format numbers for humans
//
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index e2aaa3169..8a3ab8427 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -186,7 +186,7 @@ IoBufferExtendedCore::IoBufferExtendedCore(const IoBufferExtendedCore* Outer, ui
, m_FileHandle(Outer->m_FileHandle)
, m_FileOffset(Outer->m_FileOffset + Offset)
{
- m_Flags.fetch_or(kIsOwnedByThis | kIsExtended, std::memory_order_relaxed);
+ m_Flags.fetch_or(kIsExtended, std::memory_order_relaxed);
}
IoBufferExtendedCore::~IoBufferExtendedCore()
@@ -217,10 +217,9 @@ IoBufferExtendedCore::~IoBufferExtendedCore()
int Fd = int(uintptr_t(m_FileHandle));
bool Success = (close(Fd) == 0);
#endif
-
if (!Success)
{
- ZEN_WARN("Error reported on file handle close!");
+ ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString());
}
}
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 769167433..738e4c1fd 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -59,7 +59,11 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
-ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc), GcContributor(Gc), m_DiskLayer(RootDir)
+ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir)
+: GcStorage(Gc)
+, GcContributor(Gc)
+, m_RootDir(RootDir)
+, m_DiskLayer(RootDir)
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
CreateDirectories(RootDir);
@@ -188,6 +192,10 @@ ZenCacheStore::Scrub(ScrubContext& Ctx)
void
ZenCacheStore::GatherReferences(GcContext& GcCtx)
{
+ Stopwatch Timer;
+ const auto Guard = MakeGuard(
+ [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
access_tracking::AccessTimes AccessTimes;
m_MemLayer.GatherAccessTimes(AccessTimes);
@@ -476,25 +484,27 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
std::filesystem::path SobsPath{BucketDir / "zen.sobs"};
std::filesystem::path SlogPath{BucketDir / "zen.slog"};
- m_SobsFile.Open(SobsPath, IsNew);
- m_SlogFile.Open(SlogPath, IsNew);
+ m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
+ m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
- m_SlogFile.Replay([&](const DiskIndexEntry& Entry) {
- if (Entry.Key == IoHash::Zero)
- {
- ++InvalidEntryCount;
- }
- else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
- {
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- }
- else
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
- }
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
- });
+ m_SlogFile.Replay(
+ [&](const DiskIndexEntry& Entry) {
+ if (Entry.Key == IoHash::Zero)
+ {
+ ++InvalidEntryCount;
+ }
+ else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone))
+ {
+ m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ }
+ else
+ {
+ m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
+ m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ }
+ MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size());
+ },
+ 0);
if (InvalidEntryCount)
{
@@ -757,6 +767,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences");
+ Stopwatch Timer;
+ const auto Guard = MakeGuard(
+ [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
const GcClock::TimePoint ExpireTime =
GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration();
@@ -905,8 +919,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
m_SlogFile.Close();
const bool IsNew = true;
- m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew);
- m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew);
+ m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
+ m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
m_SobsCursor = 0;
m_TotalSize = 0;
@@ -967,8 +981,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
uint64_t TmpCursor{};
std::vector<uint8_t> Chunk;
- TmpSobs.Open(TmpSobsPath, true);
- TmpLog.Open(TmpSlogPath, true);
+ TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate);
+ TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate);
for (const auto& Entry : ValidEntries)
{
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index b7fc18b4e..ac0f863cc 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -428,6 +428,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
"Max duration in seconds before Z$ entries get evicted.",
cxxopts::value<int32_t>(ServerOptions.GcConfig.Cache.MaxDurationSeconds)->default_value("86400"),
"");
+
+ options.add_option("gc",
+ "",
+ "disk-reserve-size",
+ "Size of gc disk reserve in bytes.",
+ cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"),
+ "");
try
{
auto result = options.parse(argc, argv);
@@ -699,6 +706,7 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
if (sol::optional<sol::table> GcConfig = lua["gc"])
{
ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
+ ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28));
if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"])
{
diff --git a/zenserver/config.h b/zenserver/config.h
index a61a7f89f..9f1b3645c 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -91,6 +91,7 @@ struct ZenGcConfig
int32_t IntervalSeconds = 0;
bool CollectSmallObjects = true;
bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
};
struct ZenServerOptions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 58b806989..617f50660 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -8,6 +8,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/testing.h>
@@ -114,10 +115,10 @@ struct ProjectStore::OplogStorage : public RefCounted
CreateDirectories(m_OplogStoragePath);
}
- m_Oplog.Open(m_OplogStoragePath / "ops.zlog", IsCreate);
+ m_Oplog.Open(m_OplogStoragePath / "ops.zlog", IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
m_Oplog.Initialize();
- m_OpBlobs.Open(m_OplogStoragePath / "ops.zops", IsCreate);
+ m_OpBlobs.Open(m_OplogStoragePath / "ops.zops", IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
ZEN_ASSERT(IsPow2(m_OpsAlign));
ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1)));
@@ -180,36 +181,39 @@ struct ProjectStore::OplogStorage : public RefCounted
uint64_t InvalidEntries = 0;
- m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) {
- if (LogEntry.OpCoreSize == 0)
- {
- ++InvalidEntries;
+ m_Oplog.Replay(
+ [&](const zen::OplogEntry& LogEntry) {
+ if (LogEntry.OpCoreSize == 0)
+ {
+ ++InvalidEntries;
- return;
- }
+ return;
+ }
- IoBuffer OpBuffer(LogEntry.OpCoreSize);
+ IoBuffer OpBuffer(LogEntry.OpCoreSize);
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
- m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
- // Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF);
+ // Verify checksum, ignore op data if incorrect
+ const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF);
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- return;
- }
+ if (OpCoreHash != LogEntry.OpCoreHash)
+ {
+ ZEN_WARN("skipping oplog entry with bad checksum!");
+ return;
+ }
- CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size()));
+ CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size()));
- m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
- m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
+ m_NextOpsOffset =
+ Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
+ m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
- Handler(Op, LogEntry);
- });
+ Handler(Op, LogEntry);
+ },
+ 0);
if (InvalidEntries)
{
@@ -653,7 +657,7 @@ ProjectStore::Project::Read()
ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath);
BasicFile Blob;
- Blob.Open(ProjectStateFilePath, false);
+ Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead);
IoBuffer Obj = Blob.ReadAll();
CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
@@ -693,7 +697,7 @@ ProjectStore::Project::Write()
ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath);
BasicFile Blob;
- Blob.Open(ProjectStateFilePath, true);
+ Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate);
Blob.Write(Mem.Data(), Mem.Size(), 0);
Blob.Flush();
}
@@ -970,6 +974,10 @@ ProjectStore::Scrub(ScrubContext& Ctx)
void
ProjectStore::GatherReferences(GcContext& GcCtx)
{
+ Stopwatch Timer;
+ const auto Guard =
+ MakeGuard([this, &Timer] { ZEN_INFO("project store gathered all references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+
DiscoverProjects();
RwLock::SharedLockScope _(m_ProjectsLock);
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 667bcd317..f81deb167 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -369,6 +369,7 @@ public:
.MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
.CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
.Enabled = ServerOptions.GcConfig.Enabled,
+ .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize,
};
m_GcScheduler.Initialize(GcConfig);
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index 895db6cee..e795b67eb 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -29,10 +29,10 @@ BasicFile::~BasicFile()
}
void
-BasicFile::Open(std::filesystem::path FileName, bool IsCreate)
+BasicFile::Open(const std::filesystem::path& FileName, Mode Mode)
{
std::error_code Ec;
- Open(FileName, IsCreate, Ec);
+ Open(FileName, Mode, Ec);
if (Ec)
{
@@ -41,22 +41,41 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate)
}
void
-BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec)
+BasicFile::Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec)
{
Ec.clear();
#if ZEN_PLATFORM_WINDOWS
- const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING;
- DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
- const DWORD dwShareMode = FILE_SHARE_READ;
- const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
- HANDLE hTemplateFile = nullptr;
-
- if (IsCreate)
+ DWORD dwCreationDisposition = 0;
+ DWORD dwDesiredAccess = 0;
+ switch (Mode)
{
- dwDesiredAccess |= DELETE;
+ case Mode::kRead:
+ dwCreationDisposition |= OPEN_EXISTING;
+ dwDesiredAccess |= GENERIC_READ;
+ break;
+ case Mode::kWrite:
+ dwCreationDisposition |= OPEN_ALWAYS;
+ dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE);
+ break;
+ case Mode::kDelete:
+ dwCreationDisposition |= OPEN_ALWAYS;
+ dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE);
+ break;
+ case Mode::kTruncate:
+ dwCreationDisposition |= CREATE_ALWAYS;
+ dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE);
+ break;
+ case Mode::kTruncateDelete:
+ dwCreationDisposition |= CREATE_ALWAYS;
+ dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE);
+ break;
}
+ const DWORD dwShareMode = FILE_SHARE_READ;
+ const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
+ HANDLE hTemplateFile = nullptr;
+
HANDLE FileHandle = CreateFile(FileName.c_str(),
dwDesiredAccess,
dwShareMode,
@@ -67,21 +86,34 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code&
if (FileHandle == INVALID_HANDLE_VALUE)
{
- Ec = zen::MakeErrorCodeFromLastError();
+ Ec = MakeErrorCodeFromLastError();
return;
}
#else
- int OpenFlags = O_RDWR | O_CLOEXEC;
- OpenFlags |= IsCreate ? O_CREAT | O_TRUNC : 0;
+ int OpenFlags = O_CLOEXEC;
+ switch (Mode)
+ {
+ case Mode::kRead:
+ OpenFlags |= O_RDONLY;
+ break;
+ case Mode::kWrite:
+ case Mode::kDelete:
+ OpenFlags |= (O_RDWR | O_CREAT);
+ break;
+ case Mode::kTruncate:
+ case Mode::kTruncateDelete:
+ OpenFlags |= (O_RDWR | O_CREAT | O_TRUNC);
+ break;
+ }
int Fd = open(FileName.c_str(), OpenFlags, 0666);
if (Fd < 0)
{
- Ec = zen::MakeErrorCodeFromLastError();
+ Ec = MakeErrorCodeFromLastError();
return;
}
- if (IsCreate)
+ if (Mode != Mode::kRead)
{
fchmod(Fd, 0666);
}
@@ -268,7 +300,14 @@ BasicFile::FileSize()
#if ZEN_PLATFORM_WINDOWS
ULARGE_INTEGER liFileSize;
liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart);
-
+ if (liFileSize.LowPart == INVALID_FILE_SIZE)
+ {
+ int Error = zen::GetLastError();
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle)));
+ }
+ }
return uint64_t(liFileSize.QuadPart);
#else
int Fd = int(uintptr_t(m_FileHandle));
@@ -279,6 +318,102 @@ BasicFile::FileSize()
#endif
}
+void
+BasicFile::SetFileSize(uint64_t FileSize)
+{
+#if ZEN_PLATFORM_WINDOWS
+ LARGE_INTEGER liFileSize;
+ liFileSize.QuadPart = FileSize;
+ BOOL OK = ::SetFilePointerEx(m_FileHandle, liFileSize, 0, FILE_BEGIN);
+ if (OK == FALSE)
+ {
+ int Error = zen::GetLastError();
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ }
+ }
+ OK = ::SetEndOfFile(m_FileHandle);
+ if (OK == FALSE)
+ {
+ int Error = zen::GetLastError();
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ }
+ }
+#elif ZEN_PLATFORM_MAC
+ int Fd = int(intptr_t(m_FileHandle));
+ if (ftruncate(Fd, (off_t)FileSize) < 0)
+ {
+ int Error = zen::GetLastError();
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ }
+ }
+ if (FileSize > 0)
+ {
+ int Error = posix_fallocate(Fd, 0, (off_t)FileSize);
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ }
+ }
+#else
+ int Fd = int(intptr_t(m_FileHandle));
+ if (ftruncate64(Fd, (off64_t)FileSize) < 0)
+ {
+ int Error = zen::GetLastError();
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ }
+ }
+ if (FileSize > 0)
+ {
+ int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize);
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ }
+ }
+#endif
+}
+
+void
+BasicFile::MarkAsDeleteOnClose(std::error_code& Ec)
+{
+ Ec.clear();
+#if ZEN_PLATFORM_WINDOWS
+ FILE_DISPOSITION_INFO Fdi{};
+ Fdi.DeleteFile = TRUE;
+ BOOL Success = SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+ if (!Success)
+ {
+ Ec = MakeErrorCodeFromLastError();
+ }
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ std::filesystem::path SourcePath = PathFromHandle(m_FileHandle);
+ if (unlink(SourcePath.c_str()) < 0)
+ {
+ int UnlinkError = zen::GetLastError();
+ if (UnlinkError != ENOENT)
+ {
+ Ec = MakeErrorCode(UnlinkError);
+ }
+ }
+#endif
+}
+
+void*
+BasicFile::Detach()
+{
+ void* FileHandle = m_FileHandle;
+ m_FileHandle = 0;
+ return FileHandle;
+}
+
//////////////////////////////////////////////////////////////////////////
TemporaryFile::~TemporaryFile()
@@ -314,9 +449,7 @@ TemporaryFile::CreateTemporary(std::filesystem::path TempDirName, std::error_cod
m_TempPath = TempDirName / TempName.c_str();
- const bool IsCreate = true;
-
- Open(m_TempPath, IsCreate, Ec);
+ Open(m_TempPath, BasicFile::Mode::kTruncateDelete, Ec);
}
void
@@ -416,8 +549,8 @@ TEST_CASE("BasicFile")
ScopedCurrentDirectoryChange _;
BasicFile File1;
- CHECK_THROWS(File1.Open("zonk", false));
- CHECK_NOTHROW(File1.Open("zonk", true));
+ CHECK_THROWS(File1.Open("zonk", BasicFile::Mode::kRead));
+ CHECK_NOTHROW(File1.Open("zonk", BasicFile::Mode::kTruncate));
CHECK_NOTHROW(File1.Write("abcd", 4, 0));
CHECK(File1.FileSize() == 4);
{
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
new file mode 100644
index 000000000..1eb859d5a
--- /dev/null
+++ b/zenstore/blockstore.cpp
@@ -0,0 +1,242 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "compactcas.h"
+
+#include <zenstore/blockstore.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <algorithm>
+# include <random>
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath)
+{
+}
+
+BlockStoreFile::~BlockStoreFile()
+{
+ m_IoBuffer = IoBuffer();
+ m_File.Detach();
+}
+
+const std::filesystem::path&
+BlockStoreFile::GetPath() const
+{
+ return m_Path;
+}
+
+void
+BlockStoreFile::Open()
+{
+ m_File.Open(m_Path, BasicFile::Mode::kDelete);
+ void* FileHandle = m_File.Handle();
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize());
+}
+
+void
+BlockStoreFile::Create(uint64_t InitialSize)
+{
+ auto ParentPath = m_Path.parent_path();
+ if (!std::filesystem::is_directory(ParentPath))
+ {
+ CreateDirectories(ParentPath);
+ }
+
+ m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete);
+ if (InitialSize > 0)
+ {
+ m_File.SetFileSize(InitialSize);
+ }
+ void* FileHandle = m_File.Handle();
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
+}
+
+uint64_t
+BlockStoreFile::FileSize()
+{
+ return m_File.FileSize();
+}
+
+void
+BlockStoreFile::MarkAsDeleteOnClose(std::error_code& Ec)
+{
+ m_File.MarkAsDeleteOnClose(Ec);
+}
+
+IoBuffer
+BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size)
+{
+ return IoBuffer(m_IoBuffer, Offset, Size);
+}
+
+void
+BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset)
+{
+ m_File.Read(Data, Size, FileOffset);
+}
+
+void
+BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
+{
+ m_File.Write(Data, Size, FileOffset);
+}
+
+void
+BlockStoreFile::Truncate(uint64_t Size)
+{
+ m_File.SetFileSize(Size);
+}
+
+void
+BlockStoreFile::Flush()
+{
+ m_File.Flush();
+}
+
+void
+BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
+{
+ m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
+}
+
+#if ZEN_WITH_TESTS
+
+static bool
+operator==(const BlockStoreLocation& Lhs, const BlockStoreLocation& Rhs)
+{
+ return Lhs.BlockIndex == Rhs.BlockIndex && Lhs.Offset == Rhs.Offset && Lhs.Size == Rhs.Size;
+}
+
+TEST_CASE("blockstore.blockstoredisklocation")
+{
+ BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0};
+ CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4));
+
+ BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0};
+ CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4));
+
+ BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
+ CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4));
+
+ BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()};
+ CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4));
+
+ BlockStoreLocation MaxBlockIndexAndOffset =
+ BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0};
+ CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4));
+
+ BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
+ .Offset = BlockStoreDiskLocation::MaxOffset * 4,
+ .Size = std::numeric_limits<uint32_t>::max()};
+ CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4));
+
+ BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex,
+ .Offset = BlockStoreDiskLocation::MaxOffset * 4096,
+ .Size = std::numeric_limits<uint32_t>::max()};
+ CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096));
+
+ BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2,
+ .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4,
+ .Size = std::numeric_limits<uint32_t>::max() / 2};
+ CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4));
+}
+
+TEST_CASE("blockstore.blockfile")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path() / "blocks";
+ CreateDirectories(RootDirectory);
+
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Create(16384);
+ CHECK(File1.FileSize() == 16384);
+ File1.Write("data", 5, 0);
+ IoBuffer DataChunk = File1.GetChunk(0, 5);
+ File1.Write("boop", 5, 5);
+ IoBuffer BoopChunk = File1.GetChunk(5, 5);
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ File1.Flush();
+ }
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Open();
+
+ char DataRaw[5];
+ File1.Read(DataRaw, 5, 0);
+ CHECK(std::string(DataRaw) == "data");
+ IoBuffer DataChunk = File1.GetChunk(0, 5);
+
+ char BoopRaw[5];
+ File1.Read(BoopRaw, 5, 5);
+ CHECK(std::string(BoopRaw) == "boop");
+
+ IoBuffer BoopChunk = File1.GetChunk(5, 5);
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ }
+
+ {
+ IoBuffer DataChunk;
+ IoBuffer BoopChunk;
+
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Open();
+ DataChunk = File1.GetChunk(0, 5);
+ BoopChunk = File1.GetChunk(5, 5);
+ }
+
+ CHECK(std::filesystem::exists(RootDirectory / "1"));
+
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ }
+ CHECK(std::filesystem::exists(RootDirectory / "1"));
+
+ {
+ IoBuffer DataChunk;
+ IoBuffer BoopChunk;
+
+ {
+ BlockStoreFile File1(RootDirectory / "1");
+ File1.Open();
+ std::error_code Ec;
+ File1.MarkAsDeleteOnClose(Ec);
+ CHECK(!Ec);
+ DataChunk = File1.GetChunk(0, 5);
+ BoopChunk = File1.GetChunk(5, 5);
+ }
+
+ const char* Data = static_cast<const char*>(DataChunk.GetData());
+ CHECK(std::string(Data) == "data");
+ const char* Boop = static_cast<const char*>(BoopChunk.GetData());
+ CHECK(std::string(Boop) == "boop");
+ }
+ CHECK(!std::filesystem::exists(RootDirectory / "1"));
+}
+
+#endif
+
+void
+blockstore_forcelink()
+{
+}
+
+} // namespace zen
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
index a90e45c04..0e1d5b242 100644
--- a/zenstore/cas.cpp
+++ b/zenstore/cas.cpp
@@ -150,8 +150,8 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
// Initialize payload storage
m_LargeStrategy.Initialize(IsNewStore);
- m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
- m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
+ m_TinyStrategy.Initialize("tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block
+ m_SmallStrategy.Initialize("sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block
}
bool
@@ -164,7 +164,7 @@ CasImpl::OpenOrCreateManifest()
std::error_code Ec;
BasicFile ManifestFile;
- ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec);
+ ManifestFile.Open(ManifestPath.c_str(), BasicFile::Mode::kRead, Ec);
bool ManifestIsOk = false;
@@ -236,7 +236,7 @@ CasImpl::UpdateManifest()
ZEN_TRACE("Writing new manifest to '{}'", ManifestPath);
BasicFile Marker;
- Marker.Open(ManifestPath.c_str(), /* IsCreate */ true);
+ Marker.Open(ManifestPath.c_str(), BasicFile::Mode::kTruncate);
Marker.Write(m_ManifestObject.GetBuffer(), 0);
}
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index 055e3feda..03a56f010 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -39,13 +39,23 @@ CasLogFile::~CasLogFile()
}
void
-CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreate)
+CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode)
{
m_RecordSize = RecordSize;
std::error_code Ec;
- m_File.Open(FileName, IsCreate, Ec);
+ BasicFile::Mode FileMode = BasicFile::Mode::kRead;
+ switch (Mode)
+ {
+ case Mode::kWrite:
+ FileMode = BasicFile::Mode::kWrite;
+ break;
+ case Mode::kTruncate:
+ FileMode = BasicFile::Mode::kTruncate;
+ break;
+ }
+ m_File.Open(FileName, FileMode, Ec);
if (Ec)
{
throw std::system_error(Ec, fmt::format("Failed to open log file '{}'", FileName));
@@ -53,8 +63,12 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
uint64_t AppendOffset = 0;
- if (IsCreate || (m_File.FileSize() < sizeof(FileHeader)))
+ if ((Mode == Mode::kTruncate) || (m_File.FileSize() < sizeof(FileHeader)))
{
+ if (Mode == Mode::kRead)
+ {
+ throw std::runtime_error(fmt::format("Mangled log header (file to small) in '{}'", FileName));
+ }
// Initialize log by writing header
FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0};
memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic);
@@ -106,20 +120,36 @@ CasLogFile::GetLogSize()
return m_File.FileSize();
}
+uint64_t
+CasLogFile::GetLogCount()
+{
+ uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire);
+ if (LogFileSize < sizeof(FileHeader))
+ {
+ return 0;
+ }
+ const uint64_t LogBaseOffset = sizeof(FileHeader);
+ const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
+ return LogEntryCount;
+}
+
void
-CasLogFile::Replay(std::function<void(const void*)>&& Handler)
+CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount)
{
uint64_t LogFileSize = m_File.FileSize();
// Ensure we end up on a clean boundary
- const uint64_t LogBaseOffset = sizeof(FileHeader);
- const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
+ uint64_t LogBaseOffset = sizeof(FileHeader);
+ size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize;
- if (LogEntryCount == 0)
+ if (LogEntryCount <= SkipEntryCount)
{
return;
}
+ LogBaseOffset += SkipEntryCount * m_RecordSize;
+ LogEntryCount -= SkipEntryCount;
+
// This should really be streaming the data rather than just
// reading it into memory, though we don't tend to get very
// large logs so it may not matter
@@ -142,7 +172,7 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler)
void
CasLogFile::Append(const void* DataPointer, uint64_t DataSize)
{
- ZEN_ASSERT(DataSize == m_RecordSize);
+ ZEN_ASSERT((DataSize % m_RecordSize) == 0);
uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize);
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 8b53a8304..509d21abe 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -127,35 +127,37 @@ struct CidStore::Impl
bool IsNew = !std::filesystem::exists(SlogPath);
- m_LogFile.Open(SlogPath, IsNew);
+ m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize()));
uint64_t TombstoneCount = 0;
uint64_t InvalidCount = 0;
- m_LogFile.Replay([&](const IndexEntry& Entry) {
- if (Entry.Compressed != IoHash::Zero)
- {
- // Update
- m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
- }
- else
- {
- if (Entry.Uncompressed != IoHash::Zero)
+ m_LogFile.Replay(
+ [&](const IndexEntry& Entry) {
+ if (Entry.Compressed != IoHash::Zero)
{
- // Tombstone
- m_CidMap.erase(Entry.Uncompressed);
- ++TombstoneCount;
+ // Update
+ m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed);
}
else
{
- // Completely uninitialized entry with both hashes set to zero indicates a
- // problem. Might be an unwritten page due to BSOD or some other problem
- ++InvalidCount;
+ if (Entry.Uncompressed != IoHash::Zero)
+ {
+ // Tombstone
+ m_CidMap.erase(Entry.Uncompressed);
+ ++TombstoneCount;
+ }
+ else
+ {
+ // Completely uninitialized entry with both hashes set to zero indicates a
+ // problem. Might be an unwritten page due to BSOD or some other problem
+ ++InvalidCount;
+ }
}
- }
- });
+ },
+ 0);
ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount);
}
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 3bf0c70df..51fe7a901 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -1,28 +1,24 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenstore/cas.h>
-
#include "compactcas.h"
-#include <zencore/compactbinarybuilder.h>
+#include <zenstore/cas.h>
+
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
-#include <zencore/memory.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
-
-#include <zenstore/gc.h>
-
-#include <filesystem>
-#include <functional>
+#include <zencore/scopeguard.h>
#include <gsl/gsl-lite.hpp>
+#include <xxhash.h>
+
#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
+# include <zenstore/cidstore.h>
# include <algorithm>
# include <random>
#endif
@@ -31,6 +27,211 @@
namespace zen {
+struct CasDiskIndexHeader
+{
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t PayloadAlignment = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+};
+
+static_assert(sizeof(CasDiskIndexHeader) == 32);
+
+namespace {
+ std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks,
+ const std::vector<IoHash>& DeletedChunks)
+ {
+ std::vector<CasDiskIndexEntry> result;
+ result.reserve(MovedChunks.size());
+ for (const auto& MovedEntry : MovedChunks)
+ {
+ result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second});
+ }
+ for (const IoHash& ChunkHash : DeletedChunks)
+ {
+ result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone});
+ }
+ return result;
+ }
+
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".ulog";
+ const char* DataExtension = ".ucas";
+
+ std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / ContainerBaseName;
+ }
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension);
+ }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension);
+ }
+
+ std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return GetBasePath(RootPath, ContainerBaseName) / "blocks";
+ }
+
+ std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+ {
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(DataExtension);
+ return Path.ToPath();
+ }
+
+ std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / (ContainerBaseName + LogExtension);
+ }
+
+ std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / (ContainerBaseName + DataExtension);
+ }
+
+ std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName)
+ {
+ return RootPath / (ContainerBaseName + IndexExtension);
+ }
+
+ struct LegacyCasDiskLocation
+ {
+ LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize)
+ {
+ ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
+ ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
+
+ memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
+ memcpy(&m_Size[0], &InSize, sizeof m_Size);
+ }
+
+ LegacyCasDiskLocation() = default;
+
+ inline uint64_t GetOffset() const
+ {
+ uint64_t Offset = 0;
+ memcpy(&Offset, &m_Offset, sizeof m_Offset);
+ return Offset;
+ }
+
+ inline uint64_t GetSize() const
+ {
+ uint64_t Size = 0;
+ memcpy(&Size, &m_Size, sizeof m_Size);
+ return Size;
+ }
+
+ private:
+ uint8_t m_Offset[5];
+ uint8_t m_Size[5];
+ };
+
+ struct LegacyCasDiskIndexEntry
+ {
+ static const uint8_t kTombstone = 0x01;
+
+ IoHash Key;
+ LegacyCasDiskLocation Location;
+ ZenContentType ContentType = ZenContentType::kUnknownContentType;
+ uint8_t Flags = 0;
+ };
+
+ bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0)
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone)
+ {
+ return true;
+ }
+ if (Entry.ContentType != ZenContentType::kUnknownContentType)
+ {
+ OutReason =
+ fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
+ return false;
+ }
+ uint64_t Size = Entry.Location.GetSize();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+ bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason)
+ {
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0)
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ return true;
+ }
+ if (Entry.ContentType != ZenContentType::kUnknownContentType)
+ {
+ OutReason =
+ fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString());
+ return false;
+ }
+ uint64_t Size = Entry.Location.GetSize();
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
: GcStorage(Gc)
, m_Config(Config)
@@ -43,13 +244,16 @@ CasContainerStrategy::~CasContainerStrategy()
}
void
-CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore)
+CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore)
{
ZEN_ASSERT(IsPow2(Alignment));
ZEN_ASSERT(!m_IsInitialized);
+ ZEN_ASSERT(MaxBlockSize > 0);
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
+ m_MaxBlockSize = MaxBlockSize;
+ m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName);
OpenContainer(IsNewStore);
@@ -59,36 +263,79 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
+ uint32_t WriteBlockIndex;
+ Ref<BlockStoreFile> WriteBlock;
+ uint64_t InsertOffset;
{
- RwLock::SharedLockScope _(m_LocationMapLock);
- auto KeyIt = m_LocationMap.find(ChunkHash);
+ RwLock::ExclusiveLockScope _(m_InsertLock);
- if (KeyIt != m_LocationMap.end())
{
- return CasStore::InsertResult{.New = false};
+ RwLock::SharedLockScope __(m_LocationMapLock);
+ if (m_LocationMap.contains(ChunkHash))
+ {
+ return CasStore::InsertResult{.New = false};
+ }
}
- }
-
- // New entry
-
- RwLock::ExclusiveLockScope _(m_InsertLock);
- const uint64_t InsertOffset = m_CurrentInsertOffset;
- m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset);
+ // New entry
- m_CurrentInsertOffset = (m_CurrentInsertOffset + ChunkSize + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
-
- RwLock::ExclusiveLockScope __(m_LocationMapLock);
-
- const CasDiskLocation Location{InsertOffset, ChunkSize};
-
- m_LocationMap[ChunkHash] = Location;
-
- CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
+ WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = m_WriteBlock != nullptr;
+ if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize)
+ {
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ throw std::runtime_error(
+ fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName));
+ }
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ }
+ m_CurrentInsertOffset = 0;
+ m_WriteBlock->Create(m_MaxBlockSize);
+ }
+ InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment);
+ WriteBlock = m_WriteBlock;
+ }
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize));
+ // We can end up in a situation that InsertChunk writes the same chunk data in
+ // different locations.
+ // We release the insert lock once we have the correct WriteBlock ready and we know
+ // where to write the data. If a new InsertChunk request for the same chunk hash/data
+ // comes in before we update m_LocationMap below we will have a race.
+ // The outcome of that is that we will write the chunk data in more than one location
+ // but the chunk hash will only point to one of the chunks.
+ // We will in that case waste space until the next GC operation.
+ //
+ // This should be a rare occasion and the current flow reduces the time we block for
+ // reads, insert and GC.
+
+ BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
+
+ WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
m_CasLog.Append(IndexEntry);
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_release);
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, Location);
+ }
+
return CasStore::InsertResult{.New = true};
}
@@ -101,31 +348,28 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
+ Ref<BlockStoreFile> ChunkBlock;
+ BlockStoreLocation Location;
{
- const CasDiskLocation& Location = KeyIt->second;
-
- return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize());
+ RwLock::SharedLockScope _(m_LocationMapLock);
+ if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
+ {
+ Location = KeyIt->second.Get(m_PayloadAlignment);
+ ChunkBlock = m_ChunkBlocks[Location.BlockIndex];
+ }
+ else
+ {
+ return IoBuffer();
+ }
}
-
- // Not found
-
- return IoBuffer();
+ return ChunkBlock->GetChunk(Location.Offset, Location.Size);
}
bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
RwLock::SharedLockScope _(m_LocationMapLock);
-
- if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
- {
- return true;
- }
-
- return false;
+ return m_LocationMap.contains(ChunkHash);
}
void
@@ -144,20 +388,23 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- m_CasLog.Flush();
- m_SmallObjectIndex.Flush();
- m_SmallObjectFile.Flush();
+ {
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+ }
+ MakeIndexSnapshot();
}
void
CasContainerStrategy::Scrub(ScrubContext& Ctx)
{
- const uint64_t WindowSize = 4 * 1024 * 1024;
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- const uint64_t FileSize = m_SmallObjectFile.FileSize();
-
- std::vector<CasDiskIndexEntry> BigChunks;
std::vector<CasDiskIndexEntry> BadChunks;
// We do a read sweep through the payloads file and validate
@@ -166,62 +413,73 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
// pass. An alternative strategy would be to use memory mapping.
{
- IoBuffer ReadBuffer{WindowSize};
- void* BufferBase = ReadBuffer.MutableData();
+ std::vector<CasDiskIndexEntry> BigChunks;
+ const uint64_t WindowSize = 4 * 1024 * 1024;
+ IoBuffer ReadBuffer{WindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time?
+ RwLock::SharedLockScope __(m_LocationMapLock);
- do
+ for (const auto& Block : m_ChunkBlocks)
{
- const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart);
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = WindowSize;
+ const Ref<BlockStoreFile>& BlockFile = Block.second;
+ BlockFile->Open();
+ const uint64_t FileSize = BlockFile->FileSize();
- for (auto& Entry : m_LocationMap)
+ do
{
- const uint64_t EntryOffset = Entry.second.GetOffset();
+ const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
+ BlockFile->Read(BufferBase, ChunkSize, WindowStart);
- if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
+ for (auto& Entry : m_LocationMap)
{
- const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize();
+ const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment);
+ const uint64_t EntryOffset = Location.Offset;
- if (EntryEnd >= WindowEnd)
+ if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
- BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ const uint64_t EntryEnd = EntryOffset + Location.Size;
- continue;
- }
+ if (EntryEnd >= WindowEnd)
+ {
+ BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
- const IoHash ComputedHash =
- IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart,
- Entry.second.GetSize());
+ continue;
+ }
- if (Entry.first != ComputedHash)
- {
- // Hash mismatch
+ const IoHash ComputedHash =
+ IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size);
- BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ if (Entry.first != ComputedHash)
+ {
+ // Hash mismatch
+ BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone});
+ }
}
}
- }
- WindowStart += WindowSize;
- WindowEnd += WindowSize;
- } while (WindowStart < FileSize);
- }
-
- // Deal with large chunks
+ WindowStart += WindowSize;
+ WindowEnd += WindowSize;
+ } while (WindowStart < FileSize);
+ }
- for (const CasDiskIndexEntry& Entry : BigChunks)
- {
- IoHashStream Hasher;
- m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
- Hasher.Append(Data, Size);
- });
- IoHash ComputedHash = Hasher.GetHash();
+ // Deal with large chunks
- if (Entry.Key != ComputedHash)
+ for (const CasDiskIndexEntry& Entry : BigChunks)
{
- BadChunks.push_back(Entry);
+ IoHashStream Hasher;
+ const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment);
+ const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex];
+ BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
+ IoHash ComputedHash = Hasher.GetHash();
+
+ if (Entry.Key != ComputedHash)
+ {
+ BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone});
+ }
}
}
@@ -230,17 +488,21 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
return;
}
- ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName);
+ ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName);
// Deal with bad chunks by removing them from our lookup map
std::vector<IoHash> BadChunkHashes;
+ BadChunkHashes.reserve(BadChunks.size());
- for (const CasDiskIndexEntry& Entry : BadChunks)
+ m_CasLog.Append(BadChunks);
{
- BadChunkHashes.push_back(Entry.Key);
- m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone});
- m_LocationMap.erase(Entry.Key);
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ for (const CasDiskIndexEntry& Entry : BadChunks)
+ {
+ BadChunkHashes.push_back(Entry.Key);
+ m_LocationMap.erase(Entry.Key);
+ }
}
// Let whomever it concerns know about the bad chunks. This could
@@ -253,243 +515,1106 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
void
CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
{
- namespace fs = std::filesystem;
-
- // A naive garbage collection implementation that just copies evicted chunks
- // into a new container file. We probably need to partition the container file
- // into several parts to prevent needing to keep the entire container file during GC.
+ // It collects all the blocks that we want to delete chunks from. For each such
+ // block we keep a list of chunks to retain and a list of chunks to delete.
+ //
+ // If there is a block that we are currently writing to, that block is omitted
+ // from the garbage collection.
+ //
+ // Next it will iterate over all blocks that we want to remove chunks from.
+ // If the block is empty after removal of chunks we mark the block as pending
+ // delete - we want to delete it as soon as there are no IoBuffers using the
+ // block file.
+ // Once complete we update the m_LocationMap by removing the chunks.
+ //
+ // If the block is non-empty we write out the chunks we want to keep to a new
+ // block file (creating new block files as needed).
+ //
+ // We update the index as we complete each new block file. This makes it possible
+ // to break the GC if we want to limit time for execution.
+ //
+ // GC can fairly parallell to regular operation - it will block while taking
+ // a snapshot of the current m_LocationMap state.
+ //
+ // While moving blocks it will do a blocking operation and update the m_LocationMap
+ // after each new block is written and figuring out the path to the next new block.
ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = 0;
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ std::vector<IoHash> DeletedChunks;
+ uint64_t MovedCount = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([this,
+ &TotalTimer,
+ &WriteBlockTimeUs,
+ &WriteBlockLongestTimeUs,
+ &ReadBlockTimeUs,
+ &ReadBlockLongestTimeUs,
+ &TotalChunkCount,
+ &DeletedChunks,
+ &MovedCount,
+ &DeletedSize,
+ OldTotalSize] {
+ ZEN_INFO(
+ "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "chunks ({}).",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedChunks.size(),
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ });
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ LocationMap_t LocationMap;
+ size_t BlockCount;
+ uint64_t ExcludeBlockIndex = 0x800000000ull;
+ {
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_LocationMapLock);
+ {
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_WriteBlock)
+ {
+ ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ }
+ __.ReleaseNow();
+ }
+ LocationMap = m_LocationMap;
+ BlockCount = m_ChunkBlocks.size();
+ }
+
+ if (LocationMap.empty())
+ {
+ ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName);
+ return;
+ }
- Flush();
+ TotalChunkCount = LocationMap.size();
- std::vector<IoHash> Candidates;
- std::vector<IoHash> ChunksToKeep;
- std::vector<IoHash> ChunksToDelete;
- const uint64_t ChunkCount = m_LocationMap.size();
- uint64_t TotalSize{};
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<std::vector<IoHash>> KeepChunks;
+ std::vector<std::vector<IoHash>> DeleteChunks;
- Candidates.reserve(m_LocationMap.size());
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ KeepChunks.reserve(BlockCount);
+ DeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
- for (auto& Entry : m_LocationMap)
+ std::vector<IoHash> TotalChunkHashes;
+ TotalChunkHashes.reserve(TotalChunkCount);
+ for (const auto& Entry : LocationMap)
{
- Candidates.push_back(Entry.first);
- TotalSize += Entry.second.GetSize();
+ TotalChunkHashes.push_back(Entry.first);
}
- ChunksToKeep.reserve(Candidates.size());
- GcCtx.FilterCas(Candidates, [&ChunksToKeep, &ChunksToDelete](const IoHash& Hash, bool Keep) {
+ uint64_t DeleteCount = 0;
+
+ uint64_t NewTotalSize = 0;
+ GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreDiskLocation& Location = KeyIt->second;
+ uint32_t BlockIndex = Location.GetBlockIndex();
+
+ if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex)
+ {
+ return;
+ }
+
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = KeepChunks.size();
+ BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex;
+ KeepChunks.resize(ChunkMapIndex + 1);
+ KeepChunks.back().reserve(GuesstimateCountPerBlock);
+ DeleteChunks.resize(ChunkMapIndex + 1);
+ DeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ }
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
if (Keep)
{
- ChunksToKeep.push_back(Hash);
+ std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex];
+ ChunkMap.push_back(ChunkHash);
+ NewTotalSize += Location.GetSize();
}
else
{
- ChunksToDelete.push_back(Hash);
+ std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ ChunkMap.push_back(ChunkHash);
+ DeleteCount++;
}
});
- if (m_LocationMap.empty() || ChunksToKeep.size() == m_LocationMap.size())
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
{
- ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete",
- ChunkCount,
- NiceBytes(TotalSize),
- m_Config.RootDirectory / m_ContainerBaseName);
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
+ {
+ continue;
+ }
+ BlocksToReWrite.insert(BlockIndex);
+ }
+
+ const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ if (!PerformDelete)
+ {
+ uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ DeleteCount,
+ NiceBytes(TotalSize - NewTotalSize),
+ TotalChunkCount,
+ NiceBytes(TotalSize));
return;
}
- const uint64_t NewChunkCount = ChunksToKeep.size();
- uint64_t NewTotalSize = 0;
+ // Move all chunks in blocks that have chunks removed to new blocks
+
+ Ref<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+ DeletedChunks.reserve(DeleteCount);
- for (const IoHash& Key : ChunksToKeep)
+ auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) {
+ for (const CasDiskIndexEntry& Entry : Entries)
+ {
+ if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ auto KeyIt = m_LocationMap.find(Entry.Key);
+ uint64_t ChunkSize = KeyIt->second.GetSize();
+ m_TotalSize.fetch_sub(ChunkSize);
+ m_LocationMap.erase(KeyIt);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = Entry.Location;
+ }
+ };
+
+ std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks;
+ for (uint32_t BlockIndex : BlocksToReWrite)
{
- const CasDiskLocation& Loc = m_LocationMap[Key];
- NewTotalSize += Loc.GetSize();
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_LocationMapLock);
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ }
+
+ const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap);
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
+ {
+ RwLock::ExclusiveLockScope _i(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'",
+ m_ContainerBaseName,
+ BlockIndex,
+ OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ continue;
+ }
+
+ std::vector<uint8_t> Chunk;
+ for (const IoHash& ChunkHash : KeepMap)
+ {
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {});
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
+
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ }
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+
+ MovedCount += MovedBlockChunks.size();
+ MovedBlockChunks.clear();
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
+ return;
+ }
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedBlockChunks.emplace(
+ ChunkHash,
+ BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment));
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
+ }
+ Chunk.clear();
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = {};
+ }
+
+ const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex];
+ std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap);
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
+ {
+ RwLock::ExclusiveLockScope __(m_LocationMapLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ UpdateLocations(LogEntries);
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ MovedCount += MovedBlockChunks.size();
+ DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end());
+ MovedBlockChunks.clear();
+
+ ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath());
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ OldBlockFile = nullptr;
}
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
- if (Error)
+ for (const IoHash& ChunkHash : DeletedChunks)
{
- ZEN_ERROR("get disk space FAILED, reason '{}'", Error.message());
- return;
+ DeletedSize += LocationMap[ChunkHash].GetSize();
}
- if (Space.Free < NewTotalSize + (64 << 20))
- {
- ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}",
+ GcCtx.DeletedCas(DeletedChunks);
+}
+
+void
+CasContainerStrategy::MakeIndexSnapshot()
+{
+ ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &EntryCount, &Timer] {
+ ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}",
m_Config.RootDirectory / m_ContainerBaseName,
- NiceBytes(NewTotalSize),
- NiceBytes(Space.Free));
- return;
- }
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects();
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
- if (!CollectSmallObjects)
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(TempIndexPath))
{
- ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
- m_Config.RootDirectory / m_ContainerBaseName,
- ChunkCount - NewChunkCount,
- NiceBytes(TotalSize - NewTotalSize),
- ChunkCount,
- NiceBytes(TotalSize));
- return;
+ fs::remove(TempIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, TempIndexPath);
}
- fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas");
- fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog");
-
+ try
{
- ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath);
+ m_CasLog.Flush();
+
+ // Write the current state of the location map to a new index state
+ uint64_t LogCount = 0;
+ std::vector<CasDiskIndexEntry> Entries;
- TCasLogFile<CasDiskIndexEntry> TmpLog;
- BasicFile TmpObjectFile;
- bool IsNew = true;
+ {
+ RwLock::SharedLockScope __(m_InsertLock);
+ RwLock::SharedLockScope ___(m_LocationMapLock);
+ Entries.resize(m_LocationMap.size());
- TmpLog.Open(TmpSlogPath, IsNew);
- TmpObjectFile.Open(TmpSobsPath, IsNew);
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_LocationMap)
+ {
+ CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Location = Entry.second;
+ }
- std::vector<uint8_t> Chunk;
- uint64_t NextInsertOffset{};
+ LogCount = m_CasLog.GetLogCount();
+ }
- for (const IoHash& Key : ChunksToKeep)
- {
- const auto Entry = m_LocationMap.find(Key);
- const auto& Loc = Entry->second;
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ CasDiskIndexHeader Header = {.EntryCount = Entries.size(),
+ .LogPosition = LogCount,
+ .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)};
+
+ Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header);
- Chunk.resize(Loc.GetSize());
- m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset());
+ ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
- const uint64_t InsertOffset = NextInsertOffset;
- TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset);
- TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}});
+ // Restore any previous snapshot
- NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(TempIndexPath, IndexPath);
}
}
+ if (fs::is_regular_file(TempIndexPath))
+ {
+ fs::remove(TempIndexPath);
+ }
+}
- try
+uint64_t
+CasContainerStrategy::ReadIndexFile()
+{
+ std::vector<CasDiskIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(IndexPath))
{
- CloseContainer();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' index containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
- fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
- fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx");
- fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CasDiskIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
+ CasDiskIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) &&
+ (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) &&
+ (Header.EntryCount <= ExpectedEntryCount))
+ {
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+ m_PayloadAlignment = Header.PayloadAlignment;
- fs::remove(SobsPath);
- fs::remove(SidxPath);
- fs::remove(SlogPath);
+ std::string InvalidEntryReason;
+ for (const CasDiskIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_LocationMap[Entry.Key] = Entry.Location;
+ }
- fs::rename(TmpSobsPath, SobsPath);
- fs::rename(TmpSlogPath, SlogPath);
+ return Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ }
+ }
+ return 0;
+}
+
+uint64_t
+CasContainerStrategy::ReadLog(uint64_t SkipEntryCount)
+{
+ std::vector<CasDiskIndexEntry> Entries;
+ std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Entries, &Timer] {
+ ZEN_INFO("read store '{}' log containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
{
- // Create a new empty index file
- BasicFile SidxFile;
- SidxFile.Open(SidxPath, true);
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ uint64_t ReadCount = EntryCount - SkipEntryCount;
+ Entries.reserve(ReadCount);
+ CasLog.Replay(
+ [&](const CasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & CasDiskIndexEntry::kTombstone)
+ {
+ m_LocationMap.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ return;
+ }
+ m_LocationMap[Record.Key] = Record.Location;
+ },
+ SkipEntryCount);
+ return ReadCount;
}
+ }
+ return 0;
+}
- OpenContainer(false /* IsNewStore */);
+uint64_t
+CasContainerStrategy::MigrateLegacyData(bool CleanSource)
+{
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
- GcCtx.DeletedCas(ChunksToDelete);
+ if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
+ {
+ return 0;
+ }
- ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}",
+ ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName);
+
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ uint64_t MigratedChunkCount = 0;
+ uint32_t MigratedBlockCount = 0;
+ Stopwatch MigrationTimer;
+ uint64_t TotalSize = 0;
+ const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] {
+ ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
m_Config.RootDirectory / m_ContainerBaseName,
- ChunkCount - NewChunkCount,
- NiceBytes(TotalSize - NewTotalSize),
- ChunkCount,
+ MigratedChunkCount,
+ MigratedBlockCount,
+ NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
NiceBytes(TotalSize));
+ });
+
+ uint32_t WriteBlockIndex = 0;
+ while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
}
- catch (std::exception& Err)
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error);
+ if (Error)
{
- ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what());
+ ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message());
+ return 0;
+ }
+
+ if (Space.Free < m_MaxBlockSize)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free));
+ return 0;
+ }
+
+ BasicFile BlockFile;
+ BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- // Something went wrong, try create a new container
- OpenContainer(true /* IsNewStore */);
+ std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
+ uint64_t InvalidEntryCount = 0;
- GcCtx.DeletedCas(ChunksToDelete);
- GcCtx.DeletedCas(ChunksToKeep);
+ TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
+ LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
+ {
+ Stopwatch Timer;
+ const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] {
+ ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ LegacyDiskIndex.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ if (LegacyCasLog.Initialize())
+ {
+ LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
+ LegacyCasLog.Replay(
+ [&](const LegacyCasDiskIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone)
+ {
+ LegacyDiskIndex.erase(Record.Key);
+ return;
+ }
+ if (!ValidateLegacyEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
+ InvalidEntryCount++;
+ return;
+ }
+ LegacyDiskIndex.insert_or_assign(Record.Key, Record);
+ },
+ 0);
+
+ std::vector<IoHash> BadEntries;
+ uint64_t BlockFileSize = BlockFile.FileSize();
+ for (const auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
+ if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize)
+ {
+ continue;
+ }
+ ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
+ BadEntries.push_back(Entry.first);
+ }
+ for (const IoHash& BadHash : BadEntries)
+ {
+ LegacyDiskIndex.erase(BadHash);
+ }
+ InvalidEntryCount += BadEntries.size();
+ }
}
-}
-void
-CasContainerStrategy::MakeSnapshot()
-{
- RwLock::SharedLockScope _(m_LocationMapLock);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName);
+ }
- std::vector<CasDiskIndexEntry> Entries{m_LocationMap.size()};
+ if (LegacyDiskIndex.empty())
+ {
+ BlockFile.Close();
+ LegacyCasLog.Close();
+ if (CleanSource)
+ {
+ // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
+ // a CAS manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySidx;
+ LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
+ }
+ return 0;
+ }
- uint64_t EntryIndex = 0;
- for (auto& Entry : m_LocationMap)
+ for (const auto& Entry : LegacyDiskIndex)
{
- CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++];
- IndexEntry.Key = Entry.first;
- IndexEntry.Location = Entry.second;
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
+ TotalSize += Record.Location.GetSize();
}
- m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0);
-}
+ uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size());
+ uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize;
+ if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ MaxRequiredBlockCount,
+ BlockStoreDiskLocation::MaxBlockIndex);
+ return 0;
+ }
-void
-CasContainerStrategy::OpenContainer(bool IsNewStore)
-{
- std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas");
- std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx");
- std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog");
+ constexpr const uint64_t DiskReserve = 1ul << 28;
- m_SmallObjectFile.Open(SobsPath, IsNewStore);
- m_SmallObjectIndex.Open(SidxPath, IsNewStore);
- m_CasLog.Open(SlogPath, IsNewStore);
+ if (CleanSource)
+ {
+ if (Space.Free < (m_MaxBlockSize + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(m_MaxBlockSize + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
+ else
+ {
+ if (Space.Free < (RequiredDiskSpace + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ NiceBytes(RequiredDiskSpace + DiskReserve),
+ NiceBytes(Space.Free));
+ return 0;
+ }
+ }
- // TODO: should validate integrity of container files here
+ std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ CreateDirectories(LogPath.parent_path());
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
- m_CurrentInsertOffset = 0;
- m_CurrentIndexOffset = 0;
- m_TotalSize = 0;
+ if (CleanSource && (MaxRequiredBlockCount < 2))
+ {
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(LegacyDiskIndex.size());
- m_LocationMap.clear();
+ // We can use the block as is, just move it and add the blocks to our new log
+ for (auto& Entry : LegacyDiskIndex)
+ {
+ const LegacyCasDiskIndexEntry& Record(Entry.second);
- uint64_t MaxFileOffset = 0;
+ BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize());
+ BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment);
+ LogEntries.push_back(
+ {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags});
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ CreateDirectories(BlockPath.parent_path());
+ BlockFile.Close();
+ std::filesystem::rename(LegacyDataPath, BlockPath);
+ CasLog.Append(LogEntries);
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
+ }
- m_CasLog.Replay([&](const CasDiskIndexEntry& Record) {
- if (Record.Flags & CasDiskIndexEntry::kTombstone)
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+ }
+ else
+ {
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(LegacyDiskIndex.size());
+ for (const auto& Entry : LegacyDiskIndex)
{
- m_TotalSize.fetch_sub(Record.Location.GetSize());
+ ChunkHashes.push_back(Entry.first);
}
- else
+
+ std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
+ auto LhsKeyIt = LegacyDiskIndex.find(Lhs);
+ auto RhsKeyIt = LegacyDiskIndex.find(Rhs);
+ return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset();
+ });
+
+ uint64_t BlockSize = 0;
+ uint64_t BlockOffset = 0;
+ std::vector<BlockStoreLocation> NewLocations;
+ struct BlockData
{
- m_TotalSize.fetch_add(Record.Location.GetSize());
- m_LocationMap[Record.Key] = Record.Location;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize());
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ uint64_t BlockOffset;
+ uint64_t BlockSize;
+ uint32_t BlockIndex;
+ };
+
+ std::vector<BlockData> BlockRanges;
+ std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks;
+ BlockRanges.reserve(MaxRequiredBlockCount);
+ for (const IoHash& ChunkHash : ChunkHashes)
+ {
+ const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash];
+ const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location;
+
+ uint64_t ChunkOffset = LegacyChunkLocation.GetOffset();
+ uint64_t ChunkSize = LegacyChunkLocation.GetSize();
+ uint64_t ChunkEnd = ChunkOffset + ChunkSize;
+
+ if (BlockSize == 0)
+ {
+ BlockOffset = ChunkOffset;
+ }
+ if ((ChunkEnd - BlockOffset) > m_MaxBlockSize)
+ {
+ BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
+ BlockRange.Chunks.swap(Chunks);
+ BlockRanges.push_back(BlockRange);
+
+ WriteBlockIndex++;
+ while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+ BlockOffset = ChunkOffset;
+ BlockSize = 0;
+ }
+ BlockSize = RoundUp(BlockSize, m_PayloadAlignment);
+ BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
+ Chunks.push_back({ChunkHash, ChunkLocation});
+ BlockSize = ChunkEnd - BlockOffset;
}
- });
+ if (BlockSize > 0)
+ {
+ BlockRanges.push_back(
+ {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
+ }
+ Stopwatch WriteBlockTimer;
+
+ std::reverse(BlockRanges.begin(), BlockRanges.end());
+ std::vector<std::uint8_t> Buffer(1 << 28);
+ for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
+ {
+ const BlockData& BlockRange = BlockRanges[Idx];
+ if (Idx > 0)
+ {
+ uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
+ uint64_t Completed = BlockOffset + BlockSize - Remaining;
+ uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
+
+ ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
+ m_Config.RootDirectory / m_ContainerBaseName,
+ Idx,
+ BlockRanges.size(),
+ NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
+ NiceBytes(BlockOffset + BlockSize),
+ NiceTimeSpanMs(ETA));
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
+ ChunkBlock.Create(BlockRange.BlockSize);
+ uint64_t Offset = 0;
+ while (Offset < BlockRange.BlockSize)
+ {
+ uint64_t Size = BlockRange.BlockSize - Offset;
+ if (Size > Buffer.size())
+ {
+ Size = Buffer.size();
+ }
+ BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
+ ChunkBlock.Write(Buffer.data(), Size, Offset);
+ Offset += Size;
+ }
+ ChunkBlock.Truncate(Offset);
+ ChunkBlock.Flush();
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ LogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first];
+ BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment);
+ LogEntries.push_back(
+ {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags});
+ }
+ CasLog.Append(LogEntries);
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ m_LocationMap.insert_or_assign(Entry.Key, Entry.Location);
+ }
+ MigratedChunkCount += LogEntries.size();
+ MigratedBlockCount++;
+
+ if (CleanSource)
+ {
+ std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries;
+ LegacyLogEntries.reserve(BlockRange.Chunks.size());
+ for (const auto& Entry : BlockRange.Chunks)
+ {
+ LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone});
+ }
+ LegacyCasLog.Append(LegacyLogEntries);
+ BlockFile.SetFileSize(BlockRange.BlockOffset);
+ }
+ }
+ }
- m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1);
- m_CurrentIndexOffset = m_SmallObjectIndex.FileSize();
+ BlockFile.Close();
+ LegacyCasLog.Close();
+ CasLog.Close();
+
+ if (CleanSource)
+ {
+ // Older versions of CasContainerStrategy expects the legacy files to exist if it can find
+ // a CAS manifest and crashes on startup if they don't.
+ // In order to not break startup when switching back an older version, lets just reset
+ // the legacy data files to zero length.
+
+ BasicFile LegacyLog;
+ LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySobs;
+ LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
+ BasicFile LegacySidx;
+ LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate);
+ }
+ return MigratedChunkCount;
}
void
-CasContainerStrategy::CloseContainer()
+CasContainerStrategy::OpenContainer(bool IsNewStore)
{
- m_SmallObjectFile.Close();
- m_SmallObjectIndex.Close();
- m_CasLog.Close();
+ // Add .running file and delete on clean on close to detect bad termination
+ m_TotalSize = 0;
+
+ m_LocationMap.clear();
+
+ std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ if (IsNewStore)
+ {
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName);
+ std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+
+ std::filesystem::remove(LegacyLogPath);
+ std::filesystem::remove(LegacyDataPath);
+ std::filesystem::remove_all(BasePath);
+ }
+
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
+ uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+
+ CreateDirectories(BasePath);
+
+ std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName);
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : m_LocationMap)
+ {
+ const BlockStoreDiskLocation& Location = Entry.second;
+ m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_release);
+ KnownBlocks.insert(Location.GetBlockIndex());
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != DataExtension)
+ {
+ continue;
+ }
+ std::string FileName = Path.stem().string();
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
+ BlockFile->Open();
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+
+ if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ {
+ MakeIndexSnapshot();
+ }
+
+ // TODO: should validate integrity of container files here
}
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
-TEST_CASE("cas.compact.gc")
+namespace {
+ static IoBuffer CreateChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+} // namespace
+
+TEST_CASE("compactcas.hex")
+{
+ uint32_t Value;
+ std::string HexString;
+ CHECK(!ParseHexNumber("", Value));
+ char Hex[9];
+
+ ToHexNumber(0u, Hex);
+ HexString = std::string(Hex);
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0u);
+
+ ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "ffffffff");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == std::numeric_limits<std::uint32_t>::max());
+
+ ToHexNumber(0xadf14711u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "adf14711");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0xadf14711u);
+
+ ToHexNumber(0x80000000u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "80000000");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0x80000000u);
+
+ ToHexNumber(0x718293a4u, Hex);
+ HexString = std::string(Hex);
+ CHECK(HexString == "718293a4");
+ CHECK(ParseHexNumber(HexString, Value));
+ CHECK(Value == 0x718293a4u);
+}
+
+TEST_CASE("compactcas.compact.gc")
{
ScopedTemporaryDirectory TempDir;
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path();
-
CreateDirectories(CasConfig.RootDirectory);
const int kIterationCount = 1000;
@@ -499,7 +1624,7 @@ TEST_CASE("cas.compact.gc")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, true);
+ Cas.Initialize("test", 65536, 16, true);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -533,7 +1658,7 @@ TEST_CASE("cas.compact.gc")
{
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, false);
+ Cas.Initialize("test", 65536, 16, false);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -545,67 +1670,843 @@ TEST_CASE("cas.compact.gc")
CHECK_EQ(Value["id"].AsInt32(), i);
}
-
- GcContext Ctx;
- Cas.CollectGarbage(Ctx);
}
}
-TEST_CASE("cas.compact.totalsize")
+TEST_CASE("compactcas.compact.totalsize")
{
std::random_device rd;
std::mt19937 g(rd());
- const auto CreateChunk = [&](uint64_t Size) -> IoBuffer {
- const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t));
- std::vector<uint32_t> Values;
- Values.resize(Count);
- for (size_t Idx = 0; Idx < Count; ++Idx)
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const uint64_t kChunkSize = 1024;
+ const int32_t kChunkCount = 16;
+
{
- Values[Idx] = static_cast<uint32_t>(Idx);
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, true);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ const IoHash Hash = HashBuffer(Chunk);
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ }
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
}
- std::shuffle(Values.begin(), Values.end(), g);
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t));
- };
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, false);
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+
+ // Re-open again, this time we should have a snapshot
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 65536, 16, false);
+
+ const uint64_t TotalSize = Cas.StorageSize().DiskSize;
+ CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ }
+ }
+}
+
+TEST_CASE("compactcas.gc.basic")
+{
ScopedTemporaryDirectory TempDir;
CasStoreConfiguration CasConfig;
CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 65536, 1 << 4, true);
+
+ IoBuffer Chunk = CreateChunk(128);
+ IoHash ChunkHash = IoHash::HashBuffer(Chunk);
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(InsertResult.New);
+ Cas.Flush();
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHash));
+}
+
+TEST_CASE("compactcas.gc.removefile")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
CreateDirectories(CasConfig.RootDirectory);
- const uint64_t kChunkSize = 1024;
- const int32_t kChunkCount = 16;
+ IoBuffer Chunk = CreateChunk(128);
+ IoHash ChunkHash = IoHash::HashBuffer(Chunk);
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 65536, 1 << 4, true);
+
+ const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(InsertResult.New);
+ const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash);
+ CHECK(!InsertResultDup.New);
+ Cas.Flush();
+ }
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("cb", 65536, 1 << 4, false);
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHash));
+}
+
+TEST_CASE("compactcas.gc.compact")
+{
+ // for (uint32_t i = 0; i < 100; ++i)
{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, true);
+ Cas.Initialize("cb", 2048, 1 << 4, true);
- for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(9);
+ for (uint64_t Size : ChunkSizes)
{
- IoBuffer Chunk = CreateChunk(kChunkSize);
- const IoHash Hash = HashBuffer(Chunk);
- auto InsertResult = Cas.InsertChunk(Chunk, Hash);
- ZEN_ASSERT(InsertResult.New);
+ Chunks.push_back(CreateChunk(Size));
}
- const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(9);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New);
+ CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New);
+ CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New);
+ CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New);
+ CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New);
+ CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New);
+ CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New);
+ CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New);
+ CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ uint64_t InitialSize = Cas.StorageSize().DiskSize;
+
+ // Keep first and last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ }
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+
+ // Keep last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Keep mixed
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[1]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[8], ChunkHashes[8]);
+ }
+
+ // Keep multiple at end
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(!Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ }
+
+ // Keep every other
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[2]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ CHECK(Cas.HaveChunk(ChunkHashes[0]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[1]));
+ CHECK(Cas.HaveChunk(ChunkHashes[2]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[3]));
+ CHECK(Cas.HaveChunk(ChunkHashes[4]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[5]));
+ CHECK(Cas.HaveChunk(ChunkHashes[6]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[7]));
+ CHECK(Cas.HaveChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Verify that we nicely appended blocks even after all GC operations
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+
+ uint64_t FinalSize = Cas.StorageSize().DiskSize;
+ CHECK(InitialSize == FinalSize);
}
+}
+
+TEST_CASE("compactcas.gc.deleteblockonopen")
+{
+ ScopedTemporaryDirectory TempDir;
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (uint64_t Size : ChunkSizes)
{
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ // GC every other block
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcCtx.ContributeCas(KeepChunks);
+
+ Cas.Flush();
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+ }
+ {
+ // Re-open
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, false);
+
+ for (size_t i = 0; i < 20; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+}
+
+TEST_CASE("compactcas.gc.handleopeniobuffer")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84};
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(20);
+ for (const uint64_t& Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(20);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 1024, 16, true);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]);
+ Cas.Flush();
+
+ // GC everything
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ Cas.CollectGarbage(GcCtx);
+
+ for (size_t i = 0; i < 20; i++)
+ {
+ CHECK(!Cas.HaveChunk(ChunkHashes[i]));
+ }
+
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk));
+}
+
+TEST_CASE("compactcas.legacyconversion")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048};
+ size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
+ size_t SingleBlockSize = 0;
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(ChunkCount);
+ for (uint64_t Size : ChunkSizes)
+ {
+ Chunks.push_back(CreateChunk(Size));
+ SingleBlockSize += Size;
+ }
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(ChunkCount);
+ for (const IoBuffer& Chunk : Chunks)
+ {
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+ CreateDirectories(CasConfig.RootDirectory);
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true);
+
+ for (size_t i = 0; i < ChunkCount; i++)
+ {
+ CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New);
+ }
+
+ std::vector<IoHash> KeepChunks;
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ KeepChunks.push_back(ChunkHashes[i]);
+ }
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepChunks);
+ Cas.Flush();
+ Gc.CollectGarbage(GcCtx);
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1);
+ std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test");
+ std::filesystem::rename(BlockPath, LegacyDataPath);
+
+ std::vector<CasDiskIndexEntry> LogEntries;
+ std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test");
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(CasDiskIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry);
+ CasDiskIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion &&
+ Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
+ {
+ LogEntries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader));
+ }
+ }
+ ObjectIndexFile.Close();
+ std::filesystem::remove(IndexPath);
+ }
+
+ std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test");
+ {
+ TCasLogFile<CasDiskIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ LogEntries.reserve(CasLog.GetLogCount());
+ CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
+ }
+ TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog;
+ std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test");
+ LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
+
+ for (const CasDiskIndexEntry& Entry : LogEntries)
+ {
+ BlockStoreLocation Location = Entry.Location.Get(16);
+ LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size);
+ LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key,
+ .Location = LegacyLocation,
+ .ContentType = Entry.ContentType,
+ .Flags = Entry.Flags};
+ LegacyCasLog.Append(LegacyEntry);
+ }
+ LegacyCasLog.Close();
+
+ std::filesystem::remove_all(CasConfig.RootDirectory / "test");
+
+ {
+ CasGc Gc;
+ CasContainerStrategy Cas(CasConfig, Gc);
+ Cas.Initialize("test", 2048, 16, false);
+
+ for (size_t i = 0; i < ChunkCount; i += 2)
+ {
+ CHECK(Cas.HaveChunk(ChunkHashes[i]));
+ CHECK(!Cas.HaveChunk(ChunkHashes[i + 1]));
+ CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i])));
+ }
+ }
+}
+
+TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true))
+{
+ // for (uint32_t i = 0; i < 100; ++i)
+ {
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path();
+
+ CreateDirectories(CasConfig.RootDirectory);
+
+ const uint64_t kChunkSize = 1048;
+ const int32_t kChunkCount = 8192;
+
+ std::vector<IoHash> ChunkHashes;
+ ChunkHashes.reserve(kChunkCount);
+ std::vector<IoBuffer> Chunks;
+ Chunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ ChunkHashes.emplace_back(Hash);
+ Chunks.emplace_back(Chunk);
+ }
+
+ WorkerThreadPool ThreadPool(4);
CasGc Gc;
CasContainerStrategy Cas(CasConfig, Gc);
- Cas.Initialize("test", 16, false);
+ Cas.Initialize("test", 32768, 16, true);
+ {
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ const IoBuffer& Chunk = Chunks[Idx];
+ const IoHash& Hash = ChunkHashes[Idx];
+ ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
+
+ {
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() {
+ IoHash ChunkHash = OldChunkHashes[Idx];
+ IoBuffer Chunk = Cas.FindChunk(ChunkHash);
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ CHECK(ChunkHash == Hash);
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
+
+ std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ {
+ std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end());
+ std::vector<IoHash> NewChunkHashes;
+ NewChunkHashes.reserve(kChunkCount);
+ std::vector<IoBuffer> NewChunks;
+ NewChunks.reserve(kChunkCount);
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ IoBuffer Chunk = CreateChunk(kChunkSize);
+ IoHash Hash = HashBuffer(Chunk);
+ NewChunkHashes.emplace_back(Hash);
+ NewChunks.emplace_back(Chunk);
+ }
+
+ RwLock ChunkHashesLock;
+ std::atomic_uint32_t AddedChunkCount;
+
+ for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
+ {
+ const IoBuffer& Chunk = NewChunks[Idx];
+ const IoHash& Hash = NewChunkHashes[Idx];
+ ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() {
+ CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash);
+ ZEN_ASSERT(InsertResult.New);
+ AddedChunkCount.fetch_add(1);
+ });
+ ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() {
+ IoHash ChunkHash = OldChunkHashes[Idx];
+ IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]);
+ if (Chunk)
+ {
+ CHECK(ChunkHash == IoHash::HashBuffer(Chunk));
+ }
+ });
+ }
+
+ while (AddedChunkCount.load() < kChunkCount)
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const IoHash& ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 155 == 0)
+ {
+ if (C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ if (C + 3 < KeepHashes.size() - 1)
+ {
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+
+ {
+ std::vector<IoHash> AddedHashes;
+ {
+ RwLock::ExclusiveLockScope _(ChunkHashesLock);
+ AddedHashes.swap(NewChunkHashes);
+ }
+ // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
+ for (const IoHash& ChunkHash : AddedHashes)
+ {
+ if (Cas.HaveChunk(ChunkHash))
+ {
+ GcChunkHashes.emplace(ChunkHash);
+ }
+ }
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
+ {
+ if (C % 77 == 0 && C < KeepHashes.size() - 1)
+ {
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
+ }
+ C++;
+ }
+
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ GcCtx.ContributeCas(KeepHashes);
+ Cas.CollectGarbage(GcCtx);
+ CasChunkSet& Deleted = GcCtx.DeletedCas();
+ Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ }
+ }
+ {
+ for (const IoHash& ChunkHash : GcChunkHashes)
+ {
+ ThreadPool.ScheduleWork([&Cas, ChunkHash]() {
+ CHECK(Cas.HaveChunk(ChunkHash));
+ CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash)));
+ });
+ }
+ while (ThreadPool.PendingWork() > 0)
+ {
+ Sleep(1);
+ }
+ }
}
}
+TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true))
+{
+ const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas";
+ std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs");
+ std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs");
+ std::filesystem::remove_all(TobsBasePath);
+ std::filesystem::remove_all(SobsBasePath);
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = BigDataPath;
+ uint64_t TObsSize = 0;
+ {
+ CasGc TobsCasGc;
+ CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
+ TobsCas.Initialize("tobs", 1u << 28, 16, false);
+ TObsSize = TobsCas.StorageSize().DiskSize;
+ CHECK(TObsSize > 0);
+ }
+
+ uint64_t SObsSize = 0;
+ {
+ CasGc SobsCasGc;
+ CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
+ SobsCas.Initialize("sobs", 1u << 30, 4096, false);
+ SObsSize = SobsCas.StorageSize().DiskSize;
+ CHECK(SObsSize > 0);
+ }
+
+ CasGc TobsCasGc;
+ CasContainerStrategy TobsCas(CasConfig, TobsCasGc);
+ TobsCas.Initialize("tobs", 1u << 28, 16, false);
+ GcContext TobsGcCtx;
+ TobsCas.CollectGarbage(TobsGcCtx);
+ CHECK(TobsCas.StorageSize().DiskSize == TObsSize);
+
+ CasGc SobsCasGc;
+ CasContainerStrategy SobsCas(CasConfig, SobsCasGc);
+ SobsCas.Initialize("sobs", 1u << 30, 4096, false);
+ GcContext SobsGcCtx;
+ SobsCas.CollectGarbage(SobsGcCtx);
+ CHECK(SobsCas.StorageSize().DiskSize == SObsSize);
+}
+
#endif
void
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index c039feec9..11da37202 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -3,22 +3,13 @@
#pragma once
#include <zencore/zencore.h>
-
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/string.h>
-#include <zencore/thread.h>
-#include <zencore/uid.h>
-#include <zenstore/basicfile.h>
+#include <zenstore/blockstore.h>
#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#endif
-
#include <atomic>
+#include <limits>
#include <unordered_map>
namespace spdlog {
@@ -32,46 +23,14 @@ namespace zen {
#pragma pack(push)
#pragma pack(1)
-struct CasDiskLocation
-{
- CasDiskLocation(uint64_t InOffset, uint64_t InSize)
- {
- ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
- ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
-
- memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
- memcpy(&m_Size[0], &InSize, sizeof m_Size);
- }
-
- CasDiskLocation() = default;
-
- inline uint64_t GetOffset() const
- {
- uint64_t Offset = 0;
- memcpy(&Offset, &m_Offset, sizeof m_Offset);
- return Offset;
- }
-
- inline uint64_t GetSize() const
- {
- uint64_t Size = 0;
- memcpy(&Size, &m_Size, sizeof m_Size);
- return Size;
- }
-
-private:
- uint8_t m_Offset[5];
- uint8_t m_Size[5];
-};
-
struct CasDiskIndexEntry
{
static const uint8_t kTombstone = 0x01;
- IoHash Key;
- CasDiskLocation Location;
- ZenContentType ContentType = ZenContentType::kUnknownContentType;
- uint8_t Flags = 0;
+ IoHash Key;
+ BlockStoreDiskLocation Location;
+ ZenContentType ContentType = ZenContentType::kUnknownContentType;
+ uint8_t Flags = 0;
};
#pragma pack(pop)
@@ -91,39 +50,46 @@ struct CasContainerStrategy final : public GcStorage
CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc);
~CasContainerStrategy();
- CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(CasChunkSet& InOutChunks);
- void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore);
+ void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore);
void Flush();
void Scrub(ScrubContext& Ctx);
virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; }
+ virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::acquire)}; }
private:
- void OpenContainer(bool IsNewStore);
- void CloseContainer();
+ CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile();
+ uint64_t ReadLog(uint64_t SkipEntryCount);
+ uint64_t MigrateLegacyData(bool CleanSource);
+ void OpenContainer(bool IsNewStore);
+
spdlog::logger& Log() { return m_Log; }
const CasStoreConfiguration& m_Config;
spdlog::logger& m_Log;
- uint64_t m_PayloadAlignment = 1 << 4;
+ uint64_t m_PayloadAlignment = 1u << 4;
+ uint64_t m_MaxBlockSize = 1u << 28;
bool m_IsInitialized = false;
- BasicFile m_SmallObjectFile;
- BasicFile m_SmallObjectIndex;
TCasLogFile<CasDiskIndexEntry> m_CasLog;
std::string m_ContainerBaseName;
+ std::filesystem::path m_BlocksBasePath;
+
+ RwLock m_LocationMapLock;
+ typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t;
+ LocationMap_t m_LocationMap;
+ std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
- RwLock m_LocationMapLock;
- std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap;
- RwLock m_InsertLock; // used to serialize inserts
- std::atomic_uint64_t m_CurrentInsertOffset{};
- std::atomic_uint64_t m_CurrentIndexOffset{};
- std::atomic_uint64_t m_TotalSize{};
+ RwLock m_InsertLock; // used to serialize inserts
+ Ref<BlockStoreFile> m_WriteBlock;
+ std::uint64_t m_CurrentInsertOffset = 0;
- void MakeSnapshot();
+ std::atomic_uint32_t m_WriteBlockIndex{};
+ std::atomic_uint64_t m_TotalSize{};
};
void compactcas_forcelink();
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 758c0665b..b53cfaa54 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -12,6 +12,7 @@
#include <zencore/testing.h>
#include <zencore/testutils.h>
#include <zencore/thread.h>
+#include <zencore/timer.h>
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
#include <zenstore/gc.h>
@@ -88,18 +89,37 @@ FileCasStrategy::Initialize(bool IsNewStore)
CreateDirectories(m_Config.RootDirectory);
- m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore);
+ m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
- m_CasLog.Replay([&](const FileCasIndexEntry& Entry) {
- if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
- {
- m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed);
- }
- else
- {
- m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed);
- }
+ Stopwatch Timer;
+ const auto _ = MakeGuard([this, &Timer] {
+ ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
});
+
+ std::unordered_set<IoHash> FoundEntries;
+ FoundEntries.reserve(10000);
+ m_CasLog.Replay(
+ [&](const FileCasIndexEntry& Entry) {
+ if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
+ {
+ if (!FoundEntries.contains(Entry.Key))
+ {
+ return;
+ }
+ m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed);
+ FoundEntries.erase(Entry.Key);
+ }
+ else
+ {
+ if (FoundEntries.contains(Entry.Key))
+ {
+ return;
+ }
+ FoundEntries.insert(Entry.Key);
+ m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed);
+ }
+ },
+ 0);
}
CasStore::InsertResult
@@ -565,7 +585,7 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile&
BasicFile PayloadFile;
std::error_code Ec;
- PayloadFile.Open(Parent / File, false, Ec);
+ PayloadFile.Open(Parent / File, BasicFile::Mode::kWrite, Ec);
if (!Ec)
{
@@ -668,6 +688,20 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
std::vector<IoHash> CandidateCas;
+ uint64_t DeletedCount = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([this, &TotalTimer, &DeletedCount, &ChunkCount, OldTotalSize] {
+ ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
+ m_Config.RootDirectory,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ DeletedCount,
+ ChunkCount,
+ NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)),
+ NiceBytes(OldTotalSize));
+ });
+
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
bool KeepThis = false;
CandidateCas.clear();
@@ -689,16 +723,17 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
ChunkBytes.fetch_add(FileSize);
});
- ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes));
-
if (ChunksToDelete.empty())
{
- ZEN_INFO("nothing to delete");
-
+ ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory);
return;
}
- ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunksToDelete.size(), NiceBytes(ChunksToDeleteBytes));
+ ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
+ m_Config.RootDirectory,
+ ChunksToDelete.size(),
+ ChunkCount.load(),
+ NiceBytes(ChunksToDeleteBytes));
if (GcCtx.IsDeletionMode() == false)
{
@@ -716,8 +751,10 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
if (Ec)
{
- ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message());
+ ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message());
+ continue;
}
+ DeletedCount++;
}
GcCtx.DeletedCas(ChunksToDelete);
@@ -747,7 +784,7 @@ TEST_CASE("cas.file.move")
IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes);
BasicFile PayloadFile;
- PayloadFile.Open(Payload1Path, true);
+ PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate);
PayloadFile.Write(ZeroBytes, 0);
PayloadFile.Close();
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 3b090cae9..287dfb48a 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -5,9 +5,11 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryvalidation.h>
+#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/testutils.h>
@@ -18,6 +20,15 @@
#include <fmt/format.h>
#include <filesystem>
+#if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+#else
+# include <fcntl.h>
+# include <sys/file.h>
+# include <sys/stat.h>
+# include <unistd.h>
+#endif
+
#if ZEN_WITH_TESTS
# include <zencore/compress.h>
# include <algorithm>
@@ -31,6 +42,112 @@ namespace fs = std::filesystem;
//////////////////////////////////////////////////////////////////////////
+namespace {
+ std::error_code CreateGCReserve(const std::filesystem::path& Path, uint64_t Size)
+ {
+ if (Size == 0)
+ {
+ std::filesystem::remove(Path);
+ return std::error_code{};
+ }
+ CreateDirectories(Path.parent_path());
+ if (std::filesystem::is_regular_file(Path) && std::filesystem::file_size(Path) == Size)
+ {
+ return std::error_code();
+ }
+#if ZEN_PLATFORM_WINDOWS
+ DWORD dwCreationDisposition = CREATE_ALWAYS;
+ DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
+
+ const DWORD dwShareMode = 0;
+ const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
+ HANDLE hTemplateFile = nullptr;
+
+ HANDLE FileHandle = CreateFile(Path.c_str(),
+ dwDesiredAccess,
+ dwShareMode,
+ /* lpSecurityAttributes */ nullptr,
+ dwCreationDisposition,
+ dwFlagsAndAttributes,
+ hTemplateFile);
+
+ if (FileHandle == INVALID_HANDLE_VALUE)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ bool Keep = true;
+ auto _ = MakeGuard([FileHandle, &Keep, Path]() {
+ ::CloseHandle(FileHandle);
+ if (!Keep)
+ {
+ ::DeleteFile(Path.c_str());
+ }
+ });
+ LARGE_INTEGER liFileSize;
+ liFileSize.QuadPart = Size;
+ BOOL OK = ::SetFilePointerEx(FileHandle, liFileSize, 0, FILE_BEGIN);
+ if (!OK)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ OK = ::SetEndOfFile(FileHandle);
+ if (!OK)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ Keep = true;
+#else
+ int OpenFlags = O_CLOEXEC | O_RDWR | O_CREAT;
+ int Fd = open(Path.c_str(), OpenFlags, 0666);
+ if (Fd < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+
+ bool Keep = true;
+ auto _ = MakeGuard([Fd, &Keep, Path]() {
+ close(Fd);
+ if (!Keep)
+ {
+ unlink(Path.c_str());
+ }
+ });
+
+ if (fchmod(Fd, 0666) < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+
+# if ZEN_PLATFORM_MAC
+ if (ftruncate(Fd, (off_t)Size) < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ int Error = posix_fallocate(Fd, 0, (off_t)Size);
+ if (Error)
+ {
+ return MakeErrorCode(Error);
+ }
+# else
+ if (ftruncate64(Fd, (off64_t)Size) < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ int Error = posix_fallocate64(Fd, 0, (off64_t)Size);
+ if (Error)
+ {
+ return MakeErrorCode(Error);
+ }
+# endif
+ Keep = true;
+#endif
+ return std::error_code{};
+ }
+
+} // namespace
+
+//////////////////////////////////////////////////////////////////////////
+
CbObject
LoadCompactBinaryObject(const fs::path& Path)
{
@@ -74,6 +191,8 @@ struct GcContext::GcState
GcClock::Duration m_MaxCacheDuration = std::chrono::hours(24);
bool m_DeletionMode = true;
bool m_CollectSmallObjects = false;
+
+ std::filesystem::path DiskReservePath;
};
GcContext::GcContext(GcClock::TimePoint Time) : m_State(std::make_unique<GcState>())
@@ -194,6 +313,27 @@ GcContext::MaxCacheDuration(GcClock::Duration Duration)
m_State->m_MaxCacheDuration = Duration;
}
+void
+GcContext::DiskReservePath(const std::filesystem::path& Path)
+{
+ m_State->DiskReservePath = Path;
+}
+
+uint64_t
+GcContext::ClaimGCReserve()
+{
+ if (!std::filesystem::is_regular_file(m_State->DiskReservePath))
+ {
+ return 0;
+ }
+ uint64_t ReclaimedSize = std::filesystem::file_size(m_State->DiskReservePath);
+ if (std::filesystem::remove(m_State->DiskReservePath))
+ {
+ return ReclaimedSize;
+ }
+ return 0;
+}
+
//////////////////////////////////////////////////////////////////////////
GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc)
@@ -262,10 +402,13 @@ CasGc::CollectGarbage(GcContext& GcCtx)
RwLock::SharedLockScope _(m_Lock);
// First gather reference set
-
- for (GcContributor* Contributor : m_GcContribs)
{
- Contributor->GatherReferences(GcCtx);
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ for (GcContributor* Contributor : m_GcContribs)
+ {
+ Contributor->GatherReferences(GcCtx);
+ }
}
// Cache records reference CAS chunks with the uncompressed
@@ -300,15 +443,22 @@ CasGc::CollectGarbage(GcContext& GcCtx)
// Then trim storage
- for (GcStorage* Storage : m_GcStorage)
{
- Storage->CollectGarbage(GcCtx);
+ Stopwatch Timer;
+ const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("collected garbage in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
+ for (GcStorage* Storage : m_GcStorage)
+ {
+ Storage->CollectGarbage(GcCtx);
+ }
}
// Remove Cid to CAS hash mappings. Scrub?
if (CidStore* CidStore = m_CidStore)
{
+ Stopwatch Timer;
+ const auto Guard =
+ MakeGuard([this, &Timer] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
CidStore->RemoveCids(GcCtx.DeletedCas());
}
}
@@ -379,6 +529,15 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config)
std::filesystem::create_directories(Config.RootDirectory);
+ std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
+ if (Ec)
+ {
+ ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'",
+ m_Config.RootDirectory / "reserve.gc",
+ NiceBytes(m_Config.DiskReserveSize),
+ Ec.message());
+ }
+
m_LastGcTime = GcClock::Now();
if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state"))
@@ -475,7 +634,7 @@ GcScheduler::SchedulerThread()
if (Ec)
{
- ZEN_WARN("get disk space info FAILED, reason '{}'", Ec.message());
+ ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message());
}
ZEN_INFO("{} in use, {} of total {} free disk space, {}",
@@ -506,6 +665,7 @@ GcScheduler::SchedulerThread()
GcCtx.SetDeletionMode(true);
GcCtx.CollectSmallObjects(m_Config.CollectSmallObjects);
GcCtx.MaxCacheDuration(m_Config.MaxCacheDuration);
+ GcCtx.DiskReservePath(m_Config.RootDirectory / "reserve.gc");
if (m_TriggerParams)
{
@@ -519,27 +679,37 @@ GcScheduler::SchedulerThread()
}
}
- Stopwatch Timer;
-
ZEN_INFO("garbage collection STARTING, small objects gc {}, max cache duration {}",
GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv,
NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count())));
+ {
+ Stopwatch Timer;
+ const auto __ =
+ MakeGuard([this, &Timer] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- m_CasGc.CollectGarbage(GcCtx);
+ m_CasGc.CollectGarbage(GcCtx);
- m_LastGcTime = GcClock::Now();
- m_NextGcTime = NextGcTime(m_LastGcTime);
- WaitTime = m_Config.MonitorInterval;
+ m_LastGcTime = GcClock::Now();
+ m_NextGcTime = NextGcTime(m_LastGcTime);
+ WaitTime = m_Config.MonitorInterval;
- {
- const fs::path Path = m_Config.RootDirectory / "gc_state";
- ZEN_DEBUG("saving scheduler state to '{}'", Path);
- CbObjectWriter SchedulderState;
- SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count());
- SaveCompactBinaryObject(Path, SchedulderState.Save());
- }
+ {
+ const fs::path Path = m_Config.RootDirectory / "gc_state";
+ ZEN_DEBUG("saving scheduler state to '{}'", Path);
+ CbObjectWriter SchedulderState;
+ SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count());
+ SaveCompactBinaryObject(Path, SchedulderState.Save());
+ }
- ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize);
+ if (Ec)
+ {
+ ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason: '{}'",
+ m_Config.RootDirectory / "reserve.gc",
+ NiceBytes(m_Config.DiskReserveSize),
+ Ec.message());
+ }
+ }
uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning);
if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle)))
@@ -573,16 +743,15 @@ namespace {
static std::random_device rd;
static std::mt19937 g(rd());
- const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t));
- std::vector<uint32_t> Values;
- Values.resize(Count);
- for (size_t Idx = 0; Idx < Count; ++Idx)
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
{
- Values[Idx] = static_cast<uint32_t>(Idx);
+ Values[Idx] = static_cast<uint8_t>(Idx);
}
std::shuffle(Values.begin(), Values.end(), g);
- return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t));
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
}
static CompressedBuffer Compress(IoBuffer Buffer)
@@ -613,11 +782,209 @@ TEST_CASE("gc.basic")
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
+ CasStore->Flush();
Gc.CollectGarbage(GcCtx);
CHECK(!CidStore.ContainsChunk(InsertResult.DecompressedId));
}
+TEST_CASE("gc.full")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ CasGc Gc;
+ std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc);
+
+ CasStore->Initialize(CasConfig);
+
+ uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
+ IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]),
+ CreateChunk(ChunkSizes[1]),
+ CreateChunk(ChunkSizes[2]),
+ CreateChunk(ChunkSizes[3]),
+ CreateChunk(ChunkSizes[4]),
+ CreateChunk(ChunkSizes[5]),
+ CreateChunk(ChunkSizes[6]),
+ CreateChunk(ChunkSizes[7]),
+ CreateChunk(ChunkSizes[8])};
+ IoHash ChunkHashes[9] = {
+ IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()),
+ IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()),
+ IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()),
+ IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()),
+ IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()),
+ IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()),
+ IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()),
+ IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()),
+ IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()),
+ };
+
+ CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
+ CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
+
+ CasStoreSize InitialSize = CasStore->TotalSize();
+
+ // Keep first and last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[0]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+ }
+
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
+
+ // Keep last
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
+
+ // Keep mixed
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[1]);
+ KeepChunks.push_back(ChunkHashes[4]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
+
+ CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ CasStore->InsertChunk(Chunks[6], ChunkHashes[6]);
+ CasStore->InsertChunk(Chunks[8], ChunkHashes[8]);
+ }
+
+ // Keep multiple at end
+ {
+ GcContext GcCtx;
+ GcCtx.CollectSmallObjects(true);
+ std::vector<IoHash> KeepChunks;
+ KeepChunks.push_back(ChunkHashes[6]);
+ KeepChunks.push_back(ChunkHashes[7]);
+ KeepChunks.push_back(ChunkHashes[8]);
+ GcCtx.ContributeCas(KeepChunks);
+
+ CasStore->Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[0]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[1]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[2]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[3]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[4]));
+ CHECK(!CasStore->ContainsChunk(ChunkHashes[5]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[6]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[7]));
+ CHECK(CasStore->ContainsChunk(ChunkHashes[8]));
+
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+
+ CasStore->InsertChunk(Chunks[0], ChunkHashes[0]);
+ CasStore->InsertChunk(Chunks[1], ChunkHashes[1]);
+ CasStore->InsertChunk(Chunks[2], ChunkHashes[2]);
+ CasStore->InsertChunk(Chunks[3], ChunkHashes[3]);
+ CasStore->InsertChunk(Chunks[4], ChunkHashes[4]);
+ CasStore->InsertChunk(Chunks[5], ChunkHashes[5]);
+ }
+
+ // Verify that we nicely appended blocks even after all GC operations
+ CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0])));
+ CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1])));
+ CHECK(ChunkHashes[2] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[2])));
+ CHECK(ChunkHashes[3] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[3])));
+ CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4])));
+ CHECK(ChunkHashes[5] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[5])));
+ CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6])));
+ CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7])));
+ CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
+
+ auto FinalSize = CasStore->TotalSize();
+ CHECK(InitialSize.TinySize == FinalSize.TinySize);
+}
#endif
void
diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h
index 2df016c76..5a500c65f 100644
--- a/zenstore/include/zenstore/basicfile.h
+++ b/zenstore/include/zenstore/basicfile.h
@@ -31,8 +31,17 @@ public:
BasicFile(const BasicFile&) = delete;
BasicFile& operator=(const BasicFile&) = delete;
- void Open(std::filesystem::path FileName, bool IsCreate);
- void Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec);
+ enum class Mode : uint32_t
+ {
+ kRead = 0, // Opens a existing file for read only
+ kWrite = 1, // Opens (or creates) a file for read and write
+ kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero
+ kDelete = 3, // Opens (or creates) a file for read and write enabling MarkAsDeleteOnClose()
+ kTruncateDelete = 4 // Opens (or creates) a file for read and write and sets the size to zero enabling MarkAsDeleteOnClose()
+ };
+
+ void Open(const std::filesystem::path& FileName, Mode Mode);
+ void Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec);
void Close();
void Read(void* Data, uint64_t Size, uint64_t FileOffset);
void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
@@ -43,13 +52,17 @@ public:
void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec);
void Flush();
uint64_t FileSize();
+ void SetFileSize(uint64_t FileSize);
IoBuffer ReadAll();
void WriteAll(IoBuffer Data, std::error_code& Ec);
+ void MarkAsDeleteOnClose(std::error_code& Ec);
+ void* Detach();
inline void* Handle() { return m_FileHandle; }
protected:
void* m_FileHandle = nullptr; // This is either null or valid
+private:
};
/**
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
new file mode 100644
index 000000000..424db461a
--- /dev/null
+++ b/zenstore/include/zenstore/blockstore.h
@@ -0,0 +1,104 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/filesystem.h>
+#include <zencore/zencore.h>
+#include <zenstore/basicfile.h>
+
+namespace zen {
+
+//////////////////////////////////////////////////////////////////////////
+
+struct BlockStoreLocation
+{
+ uint32_t BlockIndex;
+ uint64_t Offset;
+ uint64_t Size;
+};
+
+#pragma pack(push)
+#pragma pack(1)
+
+struct BlockStoreDiskLocation
+{
+ constexpr static uint32_t MaxBlockIndexBits = 20;
+ constexpr static uint32_t MaxOffsetBits = 28;
+ constexpr static uint32_t MaxBlockIndex = (1ul << BlockStoreDiskLocation::MaxBlockIndexBits) - 1ul;
+ constexpr static uint32_t MaxOffset = (1ul << BlockStoreDiskLocation::MaxOffsetBits) - 1ul;
+
+ BlockStoreDiskLocation(const BlockStoreLocation& Location, uint64_t OffsetAlignment)
+ {
+ Init(Location.BlockIndex, Location.Offset / OffsetAlignment, Location.Size);
+ }
+
+ BlockStoreDiskLocation() = default;
+
+ inline BlockStoreLocation Get(uint64_t OffsetAlignment) const
+ {
+ uint64_t PackedOffset = 0;
+ memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
+ return {.BlockIndex = static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits),
+ .Offset = (PackedOffset & MaxOffset) * OffsetAlignment,
+ .Size = GetSize()};
+ }
+
+ inline uint32_t GetBlockIndex() const
+ {
+ uint64_t PackedOffset = 0;
+ memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
+ return static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits);
+ }
+
+ inline uint64_t GetOffset(uint64_t OffsetAlignment) const
+ {
+ uint64_t PackedOffset = 0;
+ memcpy(&PackedOffset, &m_Offset, sizeof m_Offset);
+ return (PackedOffset & MaxOffset) * OffsetAlignment;
+ }
+
+ inline uint64_t GetSize() const { return m_Size; }
+
+private:
+ inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size)
+ {
+ ZEN_ASSERT(BlockIndex <= MaxBlockIndex);
+ ZEN_ASSERT(Offset <= MaxOffset);
+ ZEN_ASSERT(Size <= std::numeric_limits<std::uint32_t>::max());
+
+ m_Size = static_cast<uint32_t>(Size);
+ uint64_t PackedOffset = (static_cast<uint64_t>(BlockIndex) << MaxOffsetBits) + Offset;
+ memcpy(&m_Offset[0], &PackedOffset, sizeof m_Offset);
+ }
+
+ uint32_t m_Size;
+ uint8_t m_Offset[6];
+};
+
+#pragma pack(pop)
+
+struct BlockStoreFile : public RefCounted
+{
+ explicit BlockStoreFile(const std::filesystem::path& BlockPath);
+ ~BlockStoreFile();
+ const std::filesystem::path& GetPath() const;
+ void Open();
+ void Create(uint64_t InitialSize);
+ void MarkAsDeleteOnClose(std::error_code& Ec);
+ uint64_t FileSize();
+ IoBuffer GetChunk(uint64_t Offset, uint64_t Size);
+ void Read(void* Data, uint64_t Size, uint64_t FileOffset);
+ void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
+ void Truncate(uint64_t Size);
+ void Flush();
+ void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
+
+private:
+ const std::filesystem::path m_Path;
+ IoBuffer m_IoBuffer;
+ BasicFile m_File;
+};
+
+void blockstore_forcelink();
+
+} // namespace zen
diff --git a/zenstore/include/zenstore/cas.h b/zenstore/include/zenstore/cas.h
index 7a7233c1c..5592fbd0a 100644
--- a/zenstore/include/zenstore/cas.h
+++ b/zenstore/include/zenstore/cas.h
@@ -69,7 +69,7 @@ public:
private:
// Q: should we protect this with a lock, or is that a higher level concern?
- std::unordered_set<IoHash> m_ChunkSet;
+ std::unordered_set<IoHash, IoHash::Hasher> m_ChunkSet;
};
/** Context object for data scrubbing
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index 1bd11800c..4b93a708f 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -4,18 +4,8 @@
#include "zenstore.h"
-#include <zencore/iobuffer.h>
-#include <zencore/string.h>
-#include <zencore/thread.h>
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#endif
-
-#include <functional>
namespace zen {
@@ -25,12 +15,20 @@ public:
CasLogFile();
~CasLogFile();
- void Open(std::filesystem::path FileName, size_t RecordSize, bool isCreate);
+ enum class Mode
+ {
+ kRead,
+ kWrite,
+ kTruncate
+ };
+
+ void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode);
void Append(const void* DataPointer, uint64_t DataSize);
- void Replay(std::function<void(const void*)>&& Handler);
+ void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount);
void Flush();
void Close();
uint64_t GetLogSize();
+ uint64_t GetLogCount();
private:
struct FileHeader
@@ -51,6 +49,8 @@ private:
static_assert(sizeof(FileHeader) == 64);
private:
+ void Open(std::filesystem::path FileName, size_t RecordSize, BasicFile::Mode Mode);
+
BasicFile m_File;
FileHeader m_Header;
size_t m_RecordSize = 1;
@@ -61,18 +61,20 @@ template<typename T>
class TCasLogFile : public CasLogFile
{
public:
- void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); }
+ void Open(std::filesystem::path FileName, Mode Mode) { CasLogFile::Open(FileName, sizeof(T), Mode); }
// This should be called before the Replay() is called to do some basic sanity checking
bool Initialize() { return true; }
- void Replay(Invocable<const T&> auto Handler)
+ void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount)
{
- CasLogFile::Replay([&](const void* VoidPtr) {
- const T& Record = *reinterpret_cast<const T*>(VoidPtr);
+ CasLogFile::Replay(
+ [&](const void* VoidPtr) {
+ const T& Record = *reinterpret_cast<const T*>(VoidPtr);
- Handler(Record);
- });
+ Handler(Record);
+ },
+ SkipEntryCount);
}
void Append(const T& Record)
@@ -82,6 +84,8 @@ public:
CasLogFile::Append(&Record, sizeof Record);
}
+
+ void Append(const std::span<T>& Records) { CasLogFile::Append(Records.data(), sizeof(T) * Records.size()); }
};
} // namespace zen
diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h
index b8ba338f0..bc8dee9a3 100644
--- a/zenstore/include/zenstore/gc.h
+++ b/zenstore/include/zenstore/gc.h
@@ -78,6 +78,9 @@ public:
GcClock::Duration MaxCacheDuration() const;
void MaxCacheDuration(GcClock::Duration Duration);
+ void DiskReservePath(const std::filesystem::path& Path);
+ uint64_t ClaimGCReserve();
+
inline bool Expired(GcClock::Tick TickCount) { return Time() - GcClock::TimePointFromTick(TickCount) > MaxCacheDuration(); }
private:
@@ -170,6 +173,7 @@ struct GcSchedulerConfig
std::chrono::seconds MaxCacheDuration{86400};
bool CollectSmallObjects = true;
bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
};
/**
diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp
index dbb3dbbf7..5f40b7f60 100644
--- a/zenstore/zenstore.cpp
+++ b/zenstore/zenstore.cpp
@@ -3,6 +3,7 @@
#include "zenstore/zenstore.h"
#include <zenstore/basicfile.h>
+#include <zenstore/blockstore.h>
#include <zenstore/cas.h>
#include <zenstore/gc.h>
#include "compactcas.h"
@@ -16,6 +17,7 @@ zenstore_forcelinktests()
basicfile_forcelink();
CAS_forcelink();
filecas_forcelink();
+ blockstore_forcelink();
compactcas_forcelink();
gc_forcelink();
}