diff options
| author | Dan Engelbrecht <[email protected]> | 2022-04-07 18:22:26 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-04-07 18:22:26 +0200 |
| commit | 487352bb3e1de5a96268616bb335f9ef857cd629 (patch) | |
| tree | 4b03eb73f02bf03d1b4671775c1c5277a7d7341a | |
| parent | Add pre-commit config (#69) (diff) | |
| parent | clean up variable naming (diff) | |
| download | zen-487352bb3e1de5a96268616bb335f9ef857cd629.tar.xz zen-487352bb3e1de5a96268616bb335f9ef857cd629.zip | |
Merge pull request #58 from EpicGames/de/cas-store-with-block-store
de/cas store with block store
| -rw-r--r-- | zencore/filesystem.cpp | 4 | ||||
| -rw-r--r-- | zencore/include/zencore/string.h | 105 | ||||
| -rw-r--r-- | zencore/iobuffer.cpp | 5 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 60 | ||||
| -rw-r--r-- | zenserver/config.cpp | 8 | ||||
| -rw-r--r-- | zenserver/config.h | 1 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 58 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 1 | ||||
| -rw-r--r-- | zenstore/basicfile.cpp | 177 | ||||
| -rw-r--r-- | zenstore/blockstore.cpp | 242 | ||||
| -rw-r--r-- | zenstore/cas.cpp | 8 | ||||
| -rw-r--r-- | zenstore/caslog.cpp | 46 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 38 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 2443 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 90 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 73 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 419 | ||||
| -rw-r--r-- | zenstore/include/zenstore/basicfile.h | 17 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 104 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cas.h | 2 | ||||
| -rw-r--r-- | zenstore/include/zenstore/caslog.h | 40 | ||||
| -rw-r--r-- | zenstore/include/zenstore/gc.h | 4 | ||||
| -rw-r--r-- | zenstore/zenstore.cpp | 2 |
23 files changed, 3425 insertions, 522 deletions
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index 041abaf1d..e2778089b 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -921,6 +921,10 @@ PathFromHandle(void* NativeHandle) } const DWORD RequiredLengthIncludingNul = GetFinalPathNameByHandleW(NativeHandle, nullptr, 0, FILE_NAME_OPENED); + if (RequiredLengthIncludingNul == 0) + { + ThrowLastError(fmt::format("failed to get path from file handle {}", NativeHandle)); + } std::wstring FullPath; FullPath.resize(RequiredLengthIncludingNul - 1); diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index 4c378730f..e502120ac 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -9,6 +9,7 @@ #include <string.h> #include <charconv> #include <codecvt> +#include <concepts> #include <optional> #include <span> #include <string_view> @@ -488,6 +489,26 @@ std::string WideToUtf8(const wchar_t* wstr); void WideToUtf8(const std::wstring_view& wstr, StringBuilderBase& out); std::string WideToUtf8(const std::wstring_view Wstr); +inline uint8_t +Char2Nibble(char c) +{ + if (c >= '0' && c <= '9') + { + return uint8_t(c - '0'); + } + if (c >= 'a' && c <= 'f') + { + return uint8_t(c - 'a' + 10); + } + if (c >= 'A' && c <= 'F') + { + return uint8_t(c - 'A' + 10); + } + return uint8_t(0xff); +}; + +static constexpr const char HexChars[] = "0123456789abcdef"; + /// <summary> /// Parse hex string into a byte buffer /// </summary> @@ -501,38 +522,56 @@ ParseHexBytes(const char* InputString, size_t CharacterCount, uint8_t* OutPtr) { ZEN_ASSERT((CharacterCount & 1) == 0); - auto char2nibble = [](char c) { - uint8_t c8 = uint8_t(c - '0'); + uint8_t allBits = 0; - if (c8 < 10) - return c8; + while (CharacterCount) + { + uint8_t n0 = Char2Nibble(InputString[0]); + uint8_t n1 = Char2Nibble(InputString[1]); - c8 -= 'A' - '0' - 10; + allBits |= n0 | n1; - if (c8 < 16) - return c8; + *OutPtr = (n0 << 4) | n1; - c8 -= 'a' - 'A'; + OutPtr += 1; + InputString += 2; + CharacterCount -= 2; + } - if (c8 < 16) - return c8; + return (allBits & 0x80) == 0; +} - return uint8_t(0xff); - }; +inline void +ToHexBytes(const uint8_t* InputData, size_t ByteCount, char* OutString) +{ + while (ByteCount--) + { + uint8_t byte = *InputData++; + + *OutString++ = HexChars[byte >> 4]; + *OutString++ = HexChars[byte & 15]; + } +} + +inline bool +ParseHexNumber(const char* InputString, size_t CharacterCount, uint8_t* OutPtr) +{ + ZEN_ASSERT((CharacterCount & 1) == 0); uint8_t allBits = 0; + InputString += CharacterCount; while (CharacterCount) { - uint8_t n0 = char2nibble(InputString[0]); - uint8_t n1 = char2nibble(InputString[1]); + InputString -= 2; + uint8_t n0 = Char2Nibble(InputString[0]); + uint8_t n1 = Char2Nibble(InputString[1]); allBits |= n0 | n1; *OutPtr = (n0 << 4) | n1; OutPtr += 1; - InputString += 2; CharacterCount -= 2; } @@ -540,19 +579,43 @@ ParseHexBytes(const char* InputString, size_t CharacterCount, uint8_t* OutPtr) } inline void -ToHexBytes(const uint8_t* InputData, size_t ByteCount, char* OutString) +ToHexNumber(const uint8_t* InputData, size_t ByteCount, char* OutString) { - const char hexchars[] = "0123456789abcdef"; - + InputData += ByteCount; while (ByteCount--) { - uint8_t byte = *InputData++; + uint8_t byte = *(--InputData); - *OutString++ = hexchars[byte >> 4]; - *OutString++ = hexchars[byte & 15]; + *OutString++ = HexChars[byte >> 4]; + *OutString++ = HexChars[byte & 15]; } } +/// <summary> +/// Generates a hex number from a pointer to an integer type, this formats the number in the correct order for a hexadecimal number +/// </summary> +/// <param name="Value">Integer value type</param> +/// <param name="outString">Output buffer where resulting string is written</param> +void +ToHexNumber(std::unsigned_integral auto Value, char* OutString) +{ + ToHexNumber((const uint8_t*)&Value, sizeof(Value), OutString); + OutString[sizeof(Value) * 2] = 0; +} + +/// <summary> +/// Parse hex number string into a value, this formats the number in the correct order for a hexadecimal number +/// </summary> +/// <param name="string">Input string</param> +/// <param name="characterCount">Number of characters in string</param> +/// <param name="OutValue">Pointer to output value</param> +/// <returns>true if the input consisted of all valid hexadecimal characters</returns> +bool +ParseHexNumber(const std::string HexString, std::unsigned_integral auto& OutValue) +{ + return ParseHexNumber(HexString.c_str(), sizeof(OutValue) * 2, (uint8_t*)&OutValue); +} + ////////////////////////////////////////////////////////////////////////// // Format numbers for humans // diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index e2aaa3169..8a3ab8427 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -186,7 +186,7 @@ IoBufferExtendedCore::IoBufferExtendedCore(const IoBufferExtendedCore* Outer, ui , m_FileHandle(Outer->m_FileHandle) , m_FileOffset(Outer->m_FileOffset + Offset) { - m_Flags.fetch_or(kIsOwnedByThis | kIsExtended, std::memory_order_relaxed); + m_Flags.fetch_or(kIsExtended, std::memory_order_relaxed); } IoBufferExtendedCore::~IoBufferExtendedCore() @@ -217,10 +217,9 @@ IoBufferExtendedCore::~IoBufferExtendedCore() int Fd = int(uintptr_t(m_FileHandle)); bool Success = (close(Fd) == 0); #endif - if (!Success) { - ZEN_WARN("Error reported on file handle close!"); + ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString()); } } diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 769167433..738e4c1fd 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -59,7 +59,11 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object) WriteFile(Path, Object.GetBuffer().AsIoBuffer()); } -ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) : GcStorage(Gc), GcContributor(Gc), m_DiskLayer(RootDir) +ZenCacheStore::ZenCacheStore(CasGc& Gc, const std::filesystem::path& RootDir) +: GcStorage(Gc) +, GcContributor(Gc) +, m_RootDir(RootDir) +, m_DiskLayer(RootDir) { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); @@ -188,6 +192,10 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) void ZenCacheStore::GatherReferences(GcContext& GcCtx) { + Stopwatch Timer; + const auto Guard = MakeGuard( + [this, &Timer] { ZEN_INFO("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + access_tracking::AccessTimes AccessTimes; m_MemLayer.GatherAccessTimes(AccessTimes); @@ -476,25 +484,27 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is std::filesystem::path SobsPath{BucketDir / "zen.sobs"}; std::filesystem::path SlogPath{BucketDir / "zen.slog"}; - m_SobsFile.Open(SobsPath, IsNew); - m_SlogFile.Open(SlogPath, IsNew); + m_SobsFile.Open(SobsPath, IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); + m_SlogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); - m_SlogFile.Replay([&](const DiskIndexEntry& Entry) { - if (Entry.Key == IoHash::Zero) - { - ++InvalidEntryCount; - } - else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) - { - m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - } - else - { - m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); - m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); - } - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); - }); + m_SlogFile.Replay( + [&](const DiskIndexEntry& Entry) { + if (Entry.Key == IoHash::Zero) + { + ++InvalidEntryCount; + } + else if (Entry.Location.IsFlagSet(DiskLocation::kTombStone)) + { + m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + } + else + { + m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount())); + m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed); + } + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Entry.Location.Offset() + Entry.Location.Size()); + }, + 0); if (InvalidEntryCount) { @@ -757,6 +767,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::DiskLayer::CacheBucket::GatherReferences"); + Stopwatch Timer; + const auto Guard = MakeGuard( + [this, &Timer] { ZEN_INFO("gathered references from '{}' in {}", m_BucketDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + const GcClock::TimePoint ExpireTime = GcCtx.MaxCacheDuration() == GcClock::Duration::max() ? GcClock::TimePoint::min() : GcCtx.Time() - GcCtx.MaxCacheDuration(); @@ -905,8 +919,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) m_SlogFile.Close(); const bool IsNew = true; - m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew); - m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew); + m_SobsFile.Open(m_BucketDir / "zen.sobs", IsNew ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); + m_SlogFile.Open(m_BucketDir / "zen.slog", IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); m_SobsCursor = 0; m_TotalSize = 0; @@ -967,8 +981,8 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) uint64_t TmpCursor{}; std::vector<uint8_t> Chunk; - TmpSobs.Open(TmpSobsPath, true); - TmpLog.Open(TmpSlogPath, true); + TmpSobs.Open(TmpSobsPath, BasicFile::Mode::kTruncate); + TmpLog.Open(TmpSlogPath, CasLogFile::Mode::kTruncate); for (const auto& Entry : ValidEntries) { diff --git a/zenserver/config.cpp b/zenserver/config.cpp index b7fc18b4e..ac0f863cc 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -428,6 +428,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) "Max duration in seconds before Z$ entries get evicted.", cxxopts::value<int32_t>(ServerOptions.GcConfig.Cache.MaxDurationSeconds)->default_value("86400"), ""); + + options.add_option("gc", + "", + "disk-reserve-size", + "Size of gc disk reserve in bytes.", + cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"), + ""); try { auto result = options.parse(argc, argv); @@ -699,6 +706,7 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio if (sol::optional<sol::table> GcConfig = lua["gc"]) { ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); + ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28)); if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"]) { diff --git a/zenserver/config.h b/zenserver/config.h index a61a7f89f..9f1b3645c 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -91,6 +91,7 @@ struct ZenGcConfig int32_t IntervalSeconds = 0; bool CollectSmallObjects = true; bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; }; struct ZenServerOptions diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 58b806989..617f50660 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -8,6 +8,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/string.h> #include <zencore/testing.h> @@ -114,10 +115,10 @@ struct ProjectStore::OplogStorage : public RefCounted CreateDirectories(m_OplogStoragePath); } - m_Oplog.Open(m_OplogStoragePath / "ops.zlog", IsCreate); + m_Oplog.Open(m_OplogStoragePath / "ops.zlog", IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); m_Oplog.Initialize(); - m_OpBlobs.Open(m_OplogStoragePath / "ops.zops", IsCreate); + m_OpBlobs.Open(m_OplogStoragePath / "ops.zops", IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite); ZEN_ASSERT(IsPow2(m_OpsAlign)); ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1))); @@ -180,36 +181,39 @@ struct ProjectStore::OplogStorage : public RefCounted uint64_t InvalidEntries = 0; - m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) { - if (LogEntry.OpCoreSize == 0) - { - ++InvalidEntries; + m_Oplog.Replay( + [&](const zen::OplogEntry& LogEntry) { + if (LogEntry.OpCoreSize == 0) + { + ++InvalidEntries; - return; - } + return; + } - IoBuffer OpBuffer(LogEntry.OpCoreSize); + IoBuffer OpBuffer(LogEntry.OpCoreSize); - const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; + const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); - // Verify checksum, ignore op data if incorrect - const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF); + // Verify checksum, ignore op data if incorrect + const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF); - if (OpCoreHash != LogEntry.OpCoreHash) - { - ZEN_WARN("skipping oplog entry with bad checksum!"); - return; - } + if (OpCoreHash != LogEntry.OpCoreHash) + { + ZEN_WARN("skipping oplog entry with bad checksum!"); + return; + } - CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); + CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size())); - m_NextOpsOffset = Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); - m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); + m_NextOpsOffset = + Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign)); + m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn); - Handler(Op, LogEntry); - }); + Handler(Op, LogEntry); + }, + 0); if (InvalidEntries) { @@ -653,7 +657,7 @@ ProjectStore::Project::Read() ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath); BasicFile Blob; - Blob.Open(ProjectStateFilePath, false); + Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead); IoBuffer Obj = Blob.ReadAll(); CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All); @@ -693,7 +697,7 @@ ProjectStore::Project::Write() ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath); BasicFile Blob; - Blob.Open(ProjectStateFilePath, true); + Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate); Blob.Write(Mem.Data(), Mem.Size(), 0); Blob.Flush(); } @@ -970,6 +974,10 @@ ProjectStore::Scrub(ScrubContext& Ctx) void ProjectStore::GatherReferences(GcContext& GcCtx) { + Stopwatch Timer; + const auto Guard = + MakeGuard([this, &Timer] { ZEN_INFO("project store gathered all references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + DiscoverProjects(); RwLock::SharedLockScope _(m_ProjectsLock); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 667bcd317..f81deb167 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -369,6 +369,7 @@ public: .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, .Enabled = ServerOptions.GcConfig.Enabled, + .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, }; m_GcScheduler.Initialize(GcConfig); diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp index 895db6cee..e795b67eb 100644 --- a/zenstore/basicfile.cpp +++ b/zenstore/basicfile.cpp @@ -29,10 +29,10 @@ BasicFile::~BasicFile() } void -BasicFile::Open(std::filesystem::path FileName, bool IsCreate) +BasicFile::Open(const std::filesystem::path& FileName, Mode Mode) { std::error_code Ec; - Open(FileName, IsCreate, Ec); + Open(FileName, Mode, Ec); if (Ec) { @@ -41,22 +41,41 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate) } void -BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec) +BasicFile::Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec) { Ec.clear(); #if ZEN_PLATFORM_WINDOWS - const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING; - DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; - const DWORD dwShareMode = FILE_SHARE_READ; - const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; - HANDLE hTemplateFile = nullptr; - - if (IsCreate) + DWORD dwCreationDisposition = 0; + DWORD dwDesiredAccess = 0; + switch (Mode) { - dwDesiredAccess |= DELETE; + case Mode::kRead: + dwCreationDisposition |= OPEN_EXISTING; + dwDesiredAccess |= GENERIC_READ; + break; + case Mode::kWrite: + dwCreationDisposition |= OPEN_ALWAYS; + dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE); + break; + case Mode::kDelete: + dwCreationDisposition |= OPEN_ALWAYS; + dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE); + break; + case Mode::kTruncate: + dwCreationDisposition |= CREATE_ALWAYS; + dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE); + break; + case Mode::kTruncateDelete: + dwCreationDisposition |= CREATE_ALWAYS; + dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE); + break; } + const DWORD dwShareMode = FILE_SHARE_READ; + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; + HANDLE hTemplateFile = nullptr; + HANDLE FileHandle = CreateFile(FileName.c_str(), dwDesiredAccess, dwShareMode, @@ -67,21 +86,34 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& if (FileHandle == INVALID_HANDLE_VALUE) { - Ec = zen::MakeErrorCodeFromLastError(); + Ec = MakeErrorCodeFromLastError(); return; } #else - int OpenFlags = O_RDWR | O_CLOEXEC; - OpenFlags |= IsCreate ? O_CREAT | O_TRUNC : 0; + int OpenFlags = O_CLOEXEC; + switch (Mode) + { + case Mode::kRead: + OpenFlags |= O_RDONLY; + break; + case Mode::kWrite: + case Mode::kDelete: + OpenFlags |= (O_RDWR | O_CREAT); + break; + case Mode::kTruncate: + case Mode::kTruncateDelete: + OpenFlags |= (O_RDWR | O_CREAT | O_TRUNC); + break; + } int Fd = open(FileName.c_str(), OpenFlags, 0666); if (Fd < 0) { - Ec = zen::MakeErrorCodeFromLastError(); + Ec = MakeErrorCodeFromLastError(); return; } - if (IsCreate) + if (Mode != Mode::kRead) { fchmod(Fd, 0666); } @@ -268,7 +300,14 @@ BasicFile::FileSize() #if ZEN_PLATFORM_WINDOWS ULARGE_INTEGER liFileSize; liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart); - + if (liFileSize.LowPart == INVALID_FILE_SIZE) + { + int Error = zen::GetLastError(); + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle))); + } + } return uint64_t(liFileSize.QuadPart); #else int Fd = int(uintptr_t(m_FileHandle)); @@ -279,6 +318,102 @@ BasicFile::FileSize() #endif } +void +BasicFile::SetFileSize(uint64_t FileSize) +{ +#if ZEN_PLATFORM_WINDOWS + LARGE_INTEGER liFileSize; + liFileSize.QuadPart = FileSize; + BOOL OK = ::SetFilePointerEx(m_FileHandle, liFileSize, 0, FILE_BEGIN); + if (OK == FALSE) + { + int Error = zen::GetLastError(); + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + } + } + OK = ::SetEndOfFile(m_FileHandle); + if (OK == FALSE) + { + int Error = zen::GetLastError(); + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + } + } +#elif ZEN_PLATFORM_MAC + int Fd = int(intptr_t(m_FileHandle)); + if (ftruncate(Fd, (off_t)FileSize) < 0) + { + int Error = zen::GetLastError(); + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + } + } + if (FileSize > 0) + { + int Error = posix_fallocate(Fd, 0, (off_t)FileSize); + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + } + } +#else + int Fd = int(intptr_t(m_FileHandle)); + if (ftruncate64(Fd, (off64_t)FileSize) < 0) + { + int Error = zen::GetLastError(); + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + } + } + if (FileSize > 0) + { + int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize); + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + } + } +#endif +} + +void +BasicFile::MarkAsDeleteOnClose(std::error_code& Ec) +{ + Ec.clear(); +#if ZEN_PLATFORM_WINDOWS + FILE_DISPOSITION_INFO Fdi{}; + Fdi.DeleteFile = TRUE; + BOOL Success = SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + if (!Success) + { + Ec = MakeErrorCodeFromLastError(); + } +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path SourcePath = PathFromHandle(m_FileHandle); + if (unlink(SourcePath.c_str()) < 0) + { + int UnlinkError = zen::GetLastError(); + if (UnlinkError != ENOENT) + { + Ec = MakeErrorCode(UnlinkError); + } + } +#endif +} + +void* +BasicFile::Detach() +{ + void* FileHandle = m_FileHandle; + m_FileHandle = 0; + return FileHandle; +} + ////////////////////////////////////////////////////////////////////////// TemporaryFile::~TemporaryFile() @@ -314,9 +449,7 @@ TemporaryFile::CreateTemporary(std::filesystem::path TempDirName, std::error_cod m_TempPath = TempDirName / TempName.c_str(); - const bool IsCreate = true; - - Open(m_TempPath, IsCreate, Ec); + Open(m_TempPath, BasicFile::Mode::kTruncateDelete, Ec); } void @@ -416,8 +549,8 @@ TEST_CASE("BasicFile") ScopedCurrentDirectoryChange _; BasicFile File1; - CHECK_THROWS(File1.Open("zonk", false)); - CHECK_NOTHROW(File1.Open("zonk", true)); + CHECK_THROWS(File1.Open("zonk", BasicFile::Mode::kRead)); + CHECK_NOTHROW(File1.Open("zonk", BasicFile::Mode::kTruncate)); CHECK_NOTHROW(File1.Write("abcd", 4, 0)); CHECK(File1.FileSize() == 4); { diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp new file mode 100644 index 000000000..1eb859d5a --- /dev/null +++ b/zenstore/blockstore.cpp @@ -0,0 +1,242 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "compactcas.h" + +#include <zenstore/blockstore.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <algorithm> +# include <random> +#endif + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) +{ +} + +BlockStoreFile::~BlockStoreFile() +{ + m_IoBuffer = IoBuffer(); + m_File.Detach(); +} + +const std::filesystem::path& +BlockStoreFile::GetPath() const +{ + return m_Path; +} + +void +BlockStoreFile::Open() +{ + m_File.Open(m_Path, BasicFile::Mode::kDelete); + void* FileHandle = m_File.Handle(); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); +} + +void +BlockStoreFile::Create(uint64_t InitialSize) +{ + auto ParentPath = m_Path.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) + { + CreateDirectories(ParentPath); + } + + m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete); + if (InitialSize > 0) + { + m_File.SetFileSize(InitialSize); + } + void* FileHandle = m_File.Handle(); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); +} + +uint64_t +BlockStoreFile::FileSize() +{ + return m_File.FileSize(); +} + +void +BlockStoreFile::MarkAsDeleteOnClose(std::error_code& Ec) +{ + m_File.MarkAsDeleteOnClose(Ec); +} + +IoBuffer +BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size) +{ + return IoBuffer(m_IoBuffer, Offset, Size); +} + +void +BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset) +{ + m_File.Read(Data, Size, FileOffset); +} + +void +BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) +{ + m_File.Write(Data, Size, FileOffset); +} + +void +BlockStoreFile::Truncate(uint64_t Size) +{ + m_File.SetFileSize(Size); +} + +void +BlockStoreFile::Flush() +{ + m_File.Flush(); +} + +void +BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) +{ + m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); +} + +#if ZEN_WITH_TESTS + +static bool +operator==(const BlockStoreLocation& Lhs, const BlockStoreLocation& Rhs) +{ + return Lhs.BlockIndex == Rhs.BlockIndex && Lhs.Offset == Rhs.Offset && Lhs.Size == Rhs.Size; +} + +TEST_CASE("blockstore.blockstoredisklocation") +{ + BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; + CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4)); + + BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; + CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4)); + + BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; + CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4)); + + BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4)); + + BlockStoreLocation MaxBlockIndexAndOffset = + BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; + CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); + + BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, + .Offset = BlockStoreDiskLocation::MaxOffset * 4, + .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4)); + + BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, + .Offset = BlockStoreDiskLocation::MaxOffset * 4096, + .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096)); + + BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2, + .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4, + .Size = std::numeric_limits<uint32_t>::max() / 2}; + CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4)); +} + +TEST_CASE("blockstore.blockfile") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path() / "blocks"; + CreateDirectories(RootDirectory); + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Create(16384); + CHECK(File1.FileSize() == 16384); + File1.Write("data", 5, 0); + IoBuffer DataChunk = File1.GetChunk(0, 5); + File1.Write("boop", 5, 5); + IoBuffer BoopChunk = File1.GetChunk(5, 5); + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + File1.Flush(); + } + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + + char DataRaw[5]; + File1.Read(DataRaw, 5, 0); + CHECK(std::string(DataRaw) == "data"); + IoBuffer DataChunk = File1.GetChunk(0, 5); + + char BoopRaw[5]; + File1.Read(BoopRaw, 5, 5); + CHECK(std::string(BoopRaw) == "boop"); + + IoBuffer BoopChunk = File1.GetChunk(5, 5); + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + + { + IoBuffer DataChunk; + IoBuffer BoopChunk; + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + DataChunk = File1.GetChunk(0, 5); + BoopChunk = File1.GetChunk(5, 5); + } + + CHECK(std::filesystem::exists(RootDirectory / "1")); + + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + CHECK(std::filesystem::exists(RootDirectory / "1")); + + { + IoBuffer DataChunk; + IoBuffer BoopChunk; + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + std::error_code Ec; + File1.MarkAsDeleteOnClose(Ec); + CHECK(!Ec); + DataChunk = File1.GetChunk(0, 5); + BoopChunk = File1.GetChunk(5, 5); + } + + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + CHECK(!std::filesystem::exists(RootDirectory / "1")); +} + +#endif + +void +blockstore_forcelink() +{ +} + +} // namespace zen diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp index a90e45c04..0e1d5b242 100644 --- a/zenstore/cas.cpp +++ b/zenstore/cas.cpp @@ -150,8 +150,8 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) // Initialize payload storage m_LargeStrategy.Initialize(IsNewStore); - m_TinyStrategy.Initialize("tobs", 16, IsNewStore); - m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); + m_TinyStrategy.Initialize("tobs", 1u << 28, 16, IsNewStore); // 256 Mb per block + m_SmallStrategy.Initialize("sobs", 1u << 30, 4096, IsNewStore); // 1 Gb per block } bool @@ -164,7 +164,7 @@ CasImpl::OpenOrCreateManifest() std::error_code Ec; BasicFile ManifestFile; - ManifestFile.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); + ManifestFile.Open(ManifestPath.c_str(), BasicFile::Mode::kRead, Ec); bool ManifestIsOk = false; @@ -236,7 +236,7 @@ CasImpl::UpdateManifest() ZEN_TRACE("Writing new manifest to '{}'", ManifestPath); BasicFile Marker; - Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); + Marker.Open(ManifestPath.c_str(), BasicFile::Mode::kTruncate); Marker.Write(m_ManifestObject.GetBuffer(), 0); } diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index 055e3feda..03a56f010 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -39,13 +39,23 @@ CasLogFile::~CasLogFile() } void -CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreate) +CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode) { m_RecordSize = RecordSize; std::error_code Ec; - m_File.Open(FileName, IsCreate, Ec); + BasicFile::Mode FileMode = BasicFile::Mode::kRead; + switch (Mode) + { + case Mode::kWrite: + FileMode = BasicFile::Mode::kWrite; + break; + case Mode::kTruncate: + FileMode = BasicFile::Mode::kTruncate; + break; + } + m_File.Open(FileName, FileMode, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to open log file '{}'", FileName)); @@ -53,8 +63,12 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat uint64_t AppendOffset = 0; - if (IsCreate || (m_File.FileSize() < sizeof(FileHeader))) + if ((Mode == Mode::kTruncate) || (m_File.FileSize() < sizeof(FileHeader))) { + if (Mode == Mode::kRead) + { + throw std::runtime_error(fmt::format("Mangled log header (file to small) in '{}'", FileName)); + } // Initialize log by writing header FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0}; memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic); @@ -106,20 +120,36 @@ CasLogFile::GetLogSize() return m_File.FileSize(); } +uint64_t +CasLogFile::GetLogCount() +{ + uint64_t LogFileSize = m_AppendOffset.load(std::memory_order_acquire); + if (LogFileSize < sizeof(FileHeader)) + { + return 0; + } + const uint64_t LogBaseOffset = sizeof(FileHeader); + const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; + return LogEntryCount; +} + void -CasLogFile::Replay(std::function<void(const void*)>&& Handler) +CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount) { uint64_t LogFileSize = m_File.FileSize(); // Ensure we end up on a clean boundary - const uint64_t LogBaseOffset = sizeof(FileHeader); - const size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; + uint64_t LogBaseOffset = sizeof(FileHeader); + size_t LogEntryCount = (LogFileSize - LogBaseOffset) / m_RecordSize; - if (LogEntryCount == 0) + if (LogEntryCount <= SkipEntryCount) { return; } + LogBaseOffset += SkipEntryCount * m_RecordSize; + LogEntryCount -= SkipEntryCount; + // This should really be streaming the data rather than just // reading it into memory, though we don't tend to get very // large logs so it may not matter @@ -142,7 +172,7 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler) void CasLogFile::Append(const void* DataPointer, uint64_t DataSize) { - ZEN_ASSERT(DataSize == m_RecordSize); + ZEN_ASSERT((DataSize % m_RecordSize) == 0); uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize); diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 8b53a8304..509d21abe 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -127,35 +127,37 @@ struct CidStore::Impl bool IsNew = !std::filesystem::exists(SlogPath); - m_LogFile.Open(SlogPath, IsNew); + m_LogFile.Open(SlogPath, IsNew ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); ZEN_DEBUG("Initializing index from '{}' ({})", SlogPath, NiceBytes(m_LogFile.GetLogSize())); uint64_t TombstoneCount = 0; uint64_t InvalidCount = 0; - m_LogFile.Replay([&](const IndexEntry& Entry) { - if (Entry.Compressed != IoHash::Zero) - { - // Update - m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); - } - else - { - if (Entry.Uncompressed != IoHash::Zero) + m_LogFile.Replay( + [&](const IndexEntry& Entry) { + if (Entry.Compressed != IoHash::Zero) { - // Tombstone - m_CidMap.erase(Entry.Uncompressed); - ++TombstoneCount; + // Update + m_CidMap.insert_or_assign(Entry.Uncompressed, Entry.Compressed); } else { - // Completely uninitialized entry with both hashes set to zero indicates a - // problem. Might be an unwritten page due to BSOD or some other problem - ++InvalidCount; + if (Entry.Uncompressed != IoHash::Zero) + { + // Tombstone + m_CidMap.erase(Entry.Uncompressed); + ++TombstoneCount; + } + else + { + // Completely uninitialized entry with both hashes set to zero indicates a + // problem. Might be an unwritten page due to BSOD or some other problem + ++InvalidCount; + } } - } - }); + }, + 0); ZEN_INFO("CID index initialized: {} entries found ({} tombstones, {} invalid)", m_CidMap.size(), TombstoneCount, InvalidCount); } diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 3bf0c70df..51fe7a901 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -1,28 +1,24 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenstore/cas.h> - #include "compactcas.h" -#include <zencore/compactbinarybuilder.h> +#include <zenstore/cas.h> + #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/uid.h> - -#include <zenstore/gc.h> - -#include <filesystem> -#include <functional> +#include <zencore/scopeguard.h> #include <gsl/gsl-lite.hpp> +#include <xxhash.h> + #if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <zenstore/cidstore.h> # include <algorithm> # include <random> #endif @@ -31,6 +27,211 @@ namespace zen { +struct CasDiskIndexHeader +{ + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t PayloadAlignment = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const CasDiskIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(CasDiskIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } +}; + +static_assert(sizeof(CasDiskIndexHeader) == 32); + +namespace { + std::vector<CasDiskIndexEntry> MakeCasDiskEntries(const std::unordered_map<IoHash, BlockStoreDiskLocation>& MovedChunks, + const std::vector<IoHash>& DeletedChunks) + { + std::vector<CasDiskIndexEntry> result; + result.reserve(MovedChunks.size()); + for (const auto& MovedEntry : MovedChunks) + { + result.push_back({.Key = MovedEntry.first, .Location = MovedEntry.second}); + } + for (const IoHash& ChunkHash : DeletedChunks) + { + result.push_back({.Key = ChunkHash, .Flags = CasDiskIndexEntry::kTombstone}); + } + return result; + } + + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".ulog"; + const char* DataExtension = ".ucas"; + + std::filesystem::path GetBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / ContainerBaseName; + } + + std::filesystem::path GetIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + IndexExtension); + } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + ".tmp" + LogExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / (ContainerBaseName + LogExtension); + } + + std::filesystem::path GetBlocksBasePath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return GetBasePath(RootPath, ContainerBaseName) / "blocks"; + } + + std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) + { + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(DataExtension); + return Path.ToPath(); + } + + std::filesystem::path GetLegacyLogPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / (ContainerBaseName + LogExtension); + } + + std::filesystem::path GetLegacyDataPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / (ContainerBaseName + DataExtension); + } + + std::filesystem::path GetLegacyIndexPath(const std::filesystem::path& RootPath, const std::string& ContainerBaseName) + { + return RootPath / (ContainerBaseName + IndexExtension); + } + + struct LegacyCasDiskLocation + { + LegacyCasDiskLocation(uint64_t InOffset, uint64_t InSize) + { + ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); + ZEN_ASSERT(InSize <= 0xff'ffff'ffff); + + memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); + memcpy(&m_Size[0], &InSize, sizeof m_Size); + } + + LegacyCasDiskLocation() = default; + + inline uint64_t GetOffset() const + { + uint64_t Offset = 0; + memcpy(&Offset, &m_Offset, sizeof m_Offset); + return Offset; + } + + inline uint64_t GetSize() const + { + uint64_t Size = 0; + memcpy(&Size, &m_Size, sizeof m_Size); + return Size; + } + + private: + uint8_t m_Offset[5]; + uint8_t m_Size[5]; + }; + + struct LegacyCasDiskIndexEntry + { + static const uint8_t kTombstone = 0x01; + + IoHash Key; + LegacyCasDiskLocation Location; + ZenContentType ContentType = ZenContentType::kUnknownContentType; + uint8_t Flags = 0; + }; + + bool ValidateLegacyEntry(const LegacyCasDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if ((Entry.Flags & ~LegacyCasDiskIndexEntry::kTombstone) != 0) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & LegacyCasDiskIndexEntry::kTombstone) + { + return true; + } + if (Entry.ContentType != ZenContentType::kUnknownContentType) + { + OutReason = + fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.GetSize(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + + bool ValidateEntry(const CasDiskIndexEntry& Entry, std::string& OutReason) + { + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if ((Entry.Flags & ~CasDiskIndexEntry::kTombstone) != 0) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + return true; + } + if (Entry.ContentType != ZenContentType::kUnknownContentType) + { + OutReason = + fmt::format("Invalid content type {} for entry {}", static_cast<uint8_t>(Entry.ContentType), Entry.Key.ToHexString()); + return false; + } + uint64_t Size = Entry.Location.GetSize(); + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc) : GcStorage(Gc) , m_Config(Config) @@ -43,13 +244,16 @@ CasContainerStrategy::~CasContainerStrategy() } void -CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) +CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore) { ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); + ZEN_ASSERT(MaxBlockSize > 0); m_ContainerBaseName = ContainerBaseName; m_PayloadAlignment = Alignment; + m_MaxBlockSize = MaxBlockSize; + m_BlocksBasePath = GetBlocksBasePath(m_Config.RootDirectory, m_ContainerBaseName); OpenContainer(IsNewStore); @@ -59,36 +263,79 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 CasStore::InsertResult CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash) { + uint32_t WriteBlockIndex; + Ref<BlockStoreFile> WriteBlock; + uint64_t InsertOffset; { - RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); + RwLock::ExclusiveLockScope _(m_InsertLock); - if (KeyIt != m_LocationMap.end()) { - return CasStore::InsertResult{.New = false}; + RwLock::SharedLockScope __(m_LocationMapLock); + if (m_LocationMap.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } } - } - - // New entry - - RwLock::ExclusiveLockScope _(m_InsertLock); - const uint64_t InsertOffset = m_CurrentInsertOffset; - m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset); + // New entry - m_CurrentInsertOffset = (m_CurrentInsertOffset + ChunkSize + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); - - RwLock::ExclusiveLockScope __(m_LocationMapLock); - - const CasDiskLocation Location{InsertOffset, ChunkSize}; - - m_LocationMap[ChunkHash] = Location; - - CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; + WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = m_WriteBlock != nullptr; + if (!IsWriting || (m_CurrentInsertOffset + ChunkSize) > m_MaxBlockSize) + { + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + throw std::runtime_error( + fmt::format("unable to allocate a new block in '{}'", m_Config.RootDirectory / m_ContainerBaseName)); + } + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + m_WriteBlock = new BlockStoreFile(BlockPath); + m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + } + m_CurrentInsertOffset = 0; + m_WriteBlock->Create(m_MaxBlockSize); + } + InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + ChunkSize, m_PayloadAlignment); + WriteBlock = m_WriteBlock; + } - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize)); + // We can end up in a situation that InsertChunk writes the same chunk data in + // different locations. + // We release the insert lock once we have the correct WriteBlock ready and we know + // where to write the data. If a new InsertChunk request for the same chunk hash/data + // comes in before we update m_LocationMap below we will have a race. + // The outcome of that is that we will write the chunk data in more than one location + // but the chunk hash will only point to one of the chunks. + // We will in that case waste space until the next GC operation. + // + // This should be a rare occasion and the current flow reduces the time we block for + // reads, insert and GC. + + BlockStoreDiskLocation Location({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = ChunkSize}, m_PayloadAlignment); + const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location}; + + WriteBlock->Write(ChunkData, ChunkSize, InsertOffset); m_CasLog.Append(IndexEntry); + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_release); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + m_LocationMap.emplace(ChunkHash, Location); + } + return CasStore::InsertResult{.New = true}; } @@ -101,31 +348,28 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { - RwLock::SharedLockScope _(m_LocationMapLock); - - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + Ref<BlockStoreFile> ChunkBlock; + BlockStoreLocation Location; { - const CasDiskLocation& Location = KeyIt->second; - - return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize()); + RwLock::SharedLockScope _(m_LocationMapLock); + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) + { + Location = KeyIt->second.Get(m_PayloadAlignment); + ChunkBlock = m_ChunkBlocks[Location.BlockIndex]; + } + else + { + return IoBuffer(); + } } - - // Not found - - return IoBuffer(); + return ChunkBlock->GetChunk(Location.Offset, Location.Size); } bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - - if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) - { - return true; - } - - return false; + return m_LocationMap.contains(ChunkHash); } void @@ -144,20 +388,23 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) void CasContainerStrategy::Flush() { - m_CasLog.Flush(); - m_SmallObjectIndex.Flush(); - m_SmallObjectFile.Flush(); + { + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } + } + MakeIndexSnapshot(); } void CasContainerStrategy::Scrub(ScrubContext& Ctx) { - const uint64_t WindowSize = 4 * 1024 * 1024; - uint64_t WindowStart = 0; - uint64_t WindowEnd = WindowSize; - const uint64_t FileSize = m_SmallObjectFile.FileSize(); - - std::vector<CasDiskIndexEntry> BigChunks; std::vector<CasDiskIndexEntry> BadChunks; // We do a read sweep through the payloads file and validate @@ -166,62 +413,73 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) // pass. An alternative strategy would be to use memory mapping. { - IoBuffer ReadBuffer{WindowSize}; - void* BufferBase = ReadBuffer.MutableData(); + std::vector<CasDiskIndexEntry> BigChunks; + const uint64_t WindowSize = 4 * 1024 * 1024; + IoBuffer ReadBuffer{WindowSize}; + void* BufferBase = ReadBuffer.MutableData(); - RwLock::SharedLockScope _(m_LocationMapLock); + RwLock::SharedLockScope _(m_InsertLock); // TODO: Refactor so we don't have to keep m_InsertLock all the time? + RwLock::SharedLockScope __(m_LocationMapLock); - do + for (const auto& Block : m_ChunkBlocks) { - const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); - m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + const Ref<BlockStoreFile>& BlockFile = Block.second; + BlockFile->Open(); + const uint64_t FileSize = BlockFile->FileSize(); - for (auto& Entry : m_LocationMap) + do { - const uint64_t EntryOffset = Entry.second.GetOffset(); + const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); + BlockFile->Read(BufferBase, ChunkSize, WindowStart); - if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + for (auto& Entry : m_LocationMap) { - const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize(); + const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment); + const uint64_t EntryOffset = Location.Offset; - if (EntryEnd >= WindowEnd) + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) { - BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + const uint64_t EntryEnd = EntryOffset + Location.Size; - continue; - } + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); - const IoHash ComputedHash = - IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart, - Entry.second.GetSize()); + continue; + } - if (Entry.first != ComputedHash) - { - // Hash mismatch + const IoHash ComputedHash = + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart, Location.Size); - BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + if (Entry.first != ComputedHash) + { + // Hash mismatch + BadChunks.push_back({.Key = Entry.first, .Location = Entry.second, .Flags = CasDiskIndexEntry::kTombstone}); + } } } - } - WindowStart += WindowSize; - WindowEnd += WindowSize; - } while (WindowStart < FileSize); - } - - // Deal with large chunks + WindowStart += WindowSize; + WindowEnd += WindowSize; + } while (WindowStart < FileSize); + } - for (const CasDiskIndexEntry& Entry : BigChunks) - { - IoHashStream Hasher; - m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) { - Hasher.Append(Data, Size); - }); - IoHash ComputedHash = Hasher.GetHash(); + // Deal with large chunks - if (Entry.Key != ComputedHash) + for (const CasDiskIndexEntry& Entry : BigChunks) { - BadChunks.push_back(Entry); + IoHashStream Hasher; + const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment); + const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex]; + BlockFile->StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + + if (Entry.Key != ComputedHash) + { + BadChunks.push_back({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); + } } } @@ -230,17 +488,21 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) return; } - ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); + ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_Config.RootDirectory / m_ContainerBaseName); // Deal with bad chunks by removing them from our lookup map std::vector<IoHash> BadChunkHashes; + BadChunkHashes.reserve(BadChunks.size()); - for (const CasDiskIndexEntry& Entry : BadChunks) + m_CasLog.Append(BadChunks); { - BadChunkHashes.push_back(Entry.Key); - m_CasLog.Append({.Key = Entry.Key, .Location = Entry.Location, .Flags = CasDiskIndexEntry::kTombstone}); - m_LocationMap.erase(Entry.Key); + RwLock::ExclusiveLockScope _(m_LocationMapLock); + for (const CasDiskIndexEntry& Entry : BadChunks) + { + BadChunkHashes.push_back(Entry.Key); + m_LocationMap.erase(Entry.Key); + } } // Let whomever it concerns know about the bad chunks. This could @@ -253,243 +515,1106 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) void CasContainerStrategy::CollectGarbage(GcContext& GcCtx) { - namespace fs = std::filesystem; - - // A naive garbage collection implementation that just copies evicted chunks - // into a new container file. We probably need to partition the container file - // into several parts to prevent needing to keep the entire container file during GC. + // It collects all the blocks that we want to delete chunks from. For each such + // block we keep a list of chunks to retain and a list of chunks to delete. + // + // If there is a block that we are currently writing to, that block is omitted + // from the garbage collection. + // + // Next it will iterate over all blocks that we want to remove chunks from. + // If the block is empty after removal of chunks we mark the block as pending + // delete - we want to delete it as soon as there are no IoBuffers using the + // block file. + // Once complete we update the m_LocationMap by removing the chunks. + // + // If the block is non-empty we write out the chunks we want to keep to a new + // block file (creating new block files as needed). + // + // We update the index as we complete each new block file. This makes it possible + // to break the GC if we want to limit time for execution. + // + // GC can fairly parallell to regular operation - it will block while taking + // a snapshot of the current m_LocationMap state. + // + // While moving blocks it will do a blocking operation and update the m_LocationMap + // after each new block is written and figuring out the path to the next new block. ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName); + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = 0; + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + std::vector<IoHash> DeletedChunks; + uint64_t MovedCount = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([this, + &TotalTimer, + &WriteBlockTimeUs, + &WriteBlockLongestTimeUs, + &ReadBlockTimeUs, + &ReadBlockLongestTimeUs, + &TotalChunkCount, + &DeletedChunks, + &MovedCount, + &DeletedSize, + OldTotalSize] { + ZEN_INFO( + "garbage collect for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved " + "#{} " + "of #{} " + "chunks ({}).", + m_Config.RootDirectory / m_ContainerBaseName, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedChunks.size(), + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + }); - RwLock::ExclusiveLockScope _(m_LocationMapLock); + LocationMap_t LocationMap; + size_t BlockCount; + uint64_t ExcludeBlockIndex = 0x800000000ull; + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_LocationMapLock); + { + Stopwatch Timer; + const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_WriteBlock) + { + ExcludeBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + } + __.ReleaseNow(); + } + LocationMap = m_LocationMap; + BlockCount = m_ChunkBlocks.size(); + } + + if (LocationMap.empty()) + { + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_Config.RootDirectory / m_ContainerBaseName); + return; + } - Flush(); + TotalChunkCount = LocationMap.size(); - std::vector<IoHash> Candidates; - std::vector<IoHash> ChunksToKeep; - std::vector<IoHash> ChunksToDelete; - const uint64_t ChunkCount = m_LocationMap.size(); - uint64_t TotalSize{}; + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<std::vector<IoHash>> KeepChunks; + std::vector<std::vector<IoHash>> DeleteChunks; - Candidates.reserve(m_LocationMap.size()); + BlockIndexToChunkMapIndex.reserve(BlockCount); + KeepChunks.reserve(BlockCount); + DeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; - for (auto& Entry : m_LocationMap) + std::vector<IoHash> TotalChunkHashes; + TotalChunkHashes.reserve(TotalChunkCount); + for (const auto& Entry : LocationMap) { - Candidates.push_back(Entry.first); - TotalSize += Entry.second.GetSize(); + TotalChunkHashes.push_back(Entry.first); } - ChunksToKeep.reserve(Candidates.size()); - GcCtx.FilterCas(Candidates, [&ChunksToKeep, &ChunksToDelete](const IoHash& Hash, bool Keep) { + uint64_t DeleteCount = 0; + + uint64_t NewTotalSize = 0; + GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) { + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreDiskLocation& Location = KeyIt->second; + uint32_t BlockIndex = Location.GetBlockIndex(); + + if (static_cast<uint64_t>(BlockIndex) == ExcludeBlockIndex) + { + return; + } + + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = KeepChunks.size(); + BlockIndexToChunkMapIndex[BlockIndex] = ChunkMapIndex; + KeepChunks.resize(ChunkMapIndex + 1); + KeepChunks.back().reserve(GuesstimateCountPerBlock); + DeleteChunks.resize(ChunkMapIndex + 1); + DeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } if (Keep) { - ChunksToKeep.push_back(Hash); + std::vector<IoHash>& ChunkMap = KeepChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + NewTotalSize += Location.GetSize(); } else { - ChunksToDelete.push_back(Hash); + std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; + ChunkMap.push_back(ChunkHash); + DeleteCount++; } }); - if (m_LocationMap.empty() || ChunksToKeep.size() == m_LocationMap.size()) + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) { - ZEN_INFO("garbage collect DONE, scanned #{} {} chunks from '{}', nothing to delete", - ChunkCount, - NiceBytes(TotalSize), - m_Config.RootDirectory / m_ContainerBaseName); + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const std::vector<IoHash>& ChunkMap = DeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) + { + continue; + } + BlocksToReWrite.insert(BlockIndex); + } + + const bool PerformDelete = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + if (!PerformDelete) + { + uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed); + ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}", + m_Config.RootDirectory / m_ContainerBaseName, + DeleteCount, + NiceBytes(TotalSize - NewTotalSize), + TotalChunkCount, + NiceBytes(TotalSize)); return; } - const uint64_t NewChunkCount = ChunksToKeep.size(); - uint64_t NewTotalSize = 0; + // Move all chunks in blocks that have chunks removed to new blocks + + Ref<BlockStoreFile> NewBlockFile; + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + DeletedChunks.reserve(DeleteCount); - for (const IoHash& Key : ChunksToKeep) + auto UpdateLocations = [this](const std::span<CasDiskIndexEntry>& Entries) { + for (const CasDiskIndexEntry& Entry : Entries) + { + if (Entry.Flags & CasDiskIndexEntry::kTombstone) + { + auto KeyIt = m_LocationMap.find(Entry.Key); + uint64_t ChunkSize = KeyIt->second.GetSize(); + m_TotalSize.fetch_sub(ChunkSize); + m_LocationMap.erase(KeyIt); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } + }; + + std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks; + for (uint32_t BlockIndex : BlocksToReWrite) { - const CasDiskLocation& Loc = m_LocationMap[Key]; - NewTotalSize += Loc.GetSize(); + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_LocationMapLock); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + } + + const std::vector<IoHash>& KeepMap = KeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries({}, DeleteMap); + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + { + RwLock::ExclusiveLockScope _i(m_LocationMapLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; + } + DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); + ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", + m_ContainerBaseName, + BlockIndex, + OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + continue; + } + + std::vector<uint8_t> Chunk; + for (const IoHash& ChunkHash : KeepMap) + { + auto KeyIt = LocationMap.find(ChunkHash); + const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment); + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, {}); + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + } + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_Config.RootDirectory / m_ContainerBaseName, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex; + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + + MovedCount += MovedBlockChunks.size(); + MovedBlockChunks.clear(); + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); + return; + } + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = GcCtx.ClaimGCReserve(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_LocationMapLock); + Stopwatch Timer; + const auto __ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_Config.RootDirectory / m_ContainerBaseName, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedBlockChunks.emplace( + ChunkHash, + BlockStoreDiskLocation({.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}, m_PayloadAlignment)); + WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment); + } + Chunk.clear(); + if (NewBlockFile) + { + NewBlockFile->Truncate(WriteOffset); + NewBlockFile->Flush(); + NewBlockFile = {}; + } + + const std::vector<IoHash>& DeleteMap = DeleteChunks[ChunkMapIndex]; + std::vector<CasDiskIndexEntry> LogEntries = MakeCasDiskEntries(MovedBlockChunks, DeleteMap); + m_CasLog.Append(LogEntries); + m_CasLog.Flush(); + { + RwLock::ExclusiveLockScope __(m_LocationMapLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + UpdateLocations(LogEntries); + m_ChunkBlocks[BlockIndex] = nullptr; + } + MovedCount += MovedBlockChunks.size(); + DeletedChunks.insert(DeletedChunks.end(), DeleteMap.begin(), DeleteMap.end()); + MovedBlockChunks.clear(); + + ZEN_DEBUG("marking cas store file in '{}' for delete , block #{}, '{}'", m_ContainerBaseName, BlockIndex, OldBlockFile->GetPath()); + std::error_code Ec; + OldBlockFile->MarkAsDeleteOnClose(Ec); + if (Ec) + { + ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message()); + } + OldBlockFile = nullptr; } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); - if (Error) + for (const IoHash& ChunkHash : DeletedChunks) { - ZEN_ERROR("get disk space FAILED, reason '{}'", Error.message()); - return; + DeletedSize += LocationMap[ChunkHash].GetSize(); } - if (Space.Free < NewTotalSize + (64 << 20)) - { - ZEN_INFO("garbage collect from '{}' FAILED, required disk space {}, free {}", + GcCtx.DeletedCas(DeletedChunks); +} + +void +CasContainerStrategy::MakeIndexSnapshot() +{ + ZEN_INFO("write store snapshot for '{}'", m_Config.RootDirectory / m_ContainerBaseName); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([this, &EntryCount, &Timer] { + ZEN_INFO("wrote store snapshot for '{}' containing #{} entries in {}", m_Config.RootDirectory / m_ContainerBaseName, - NiceBytes(NewTotalSize), - NiceBytes(Space.Free)); - return; - } + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - const bool CollectSmallObjects = GcCtx.IsDeletionMode() && GcCtx.CollectSmallObjects(); + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + fs::path TempIndexPath = GetTempIndexPath(m_Config.RootDirectory, m_ContainerBaseName); - if (!CollectSmallObjects) + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(TempIndexPath)) { - ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}", - m_Config.RootDirectory / m_ContainerBaseName, - ChunkCount - NewChunkCount, - NiceBytes(TotalSize - NewTotalSize), - ChunkCount, - NiceBytes(TotalSize)); - return; + fs::remove(TempIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, TempIndexPath); } - fs::path TmpSobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ucas"); - fs::path TmpSlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".gc.ulog"); - + try { - ZEN_DEBUG("creating temporary container cas '{}'...", TmpSobsPath); + m_CasLog.Flush(); + + // Write the current state of the location map to a new index state + uint64_t LogCount = 0; + std::vector<CasDiskIndexEntry> Entries; - TCasLogFile<CasDiskIndexEntry> TmpLog; - BasicFile TmpObjectFile; - bool IsNew = true; + { + RwLock::SharedLockScope __(m_InsertLock); + RwLock::SharedLockScope ___(m_LocationMapLock); + Entries.resize(m_LocationMap.size()); - TmpLog.Open(TmpSlogPath, IsNew); - TmpObjectFile.Open(TmpSobsPath, IsNew); + uint64_t EntryIndex = 0; + for (auto& Entry : m_LocationMap) + { + CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second; + } - std::vector<uint8_t> Chunk; - uint64_t NextInsertOffset{}; + LogCount = m_CasLog.GetLogCount(); + } - for (const IoHash& Key : ChunksToKeep) - { - const auto Entry = m_LocationMap.find(Key); - const auto& Loc = Entry->second; + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + CasDiskIndexHeader Header = {.EntryCount = Entries.size(), + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_PayloadAlignment)}; + + Header.Checksum = CasDiskIndexHeader::ComputeChecksum(Header); - Chunk.resize(Loc.GetSize()); - m_SmallObjectFile.Read(Chunk.data(), Chunk.size(), Loc.GetOffset()); + ObjectIndexFile.Write(&Header, sizeof(CasDiskIndexEntry), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexEntry)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - const uint64_t InsertOffset = NextInsertOffset; - TmpObjectFile.Write(Chunk.data(), Chunk.size(), InsertOffset); - TmpLog.Append({.Key = Key, .Location = {InsertOffset, Chunk.size()}}); + // Restore any previous snapshot - NextInsertOffset = (NextInsertOffset + Chunk.size() + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(IndexPath); + fs::rename(TempIndexPath, IndexPath); } } + if (fs::is_regular_file(TempIndexPath)) + { + fs::remove(TempIndexPath); + } +} - try +uint64_t +CasContainerStrategy::ReadIndexFile() +{ + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(IndexPath)) { - CloseContainer(); + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' index containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); - fs::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); - fs::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); - fs::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CasDiskIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); + CasDiskIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == CasDiskIndexHeader::ExpectedMagic) && (Header.Version == CasDiskIndexHeader::CurrentVersion) && + (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && + (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + m_PayloadAlignment = Header.PayloadAlignment; - fs::remove(SobsPath); - fs::remove(SidxPath); - fs::remove(SlogPath); + std::string InvalidEntryReason; + for (const CasDiskIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_LocationMap[Entry.Key] = Entry.Location; + } - fs::rename(TmpSobsPath, SobsPath); - fs::rename(TmpSlogPath, SlogPath); + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + } + return 0; +} + +uint64_t +CasContainerStrategy::ReadLog(uint64_t SkipEntryCount) +{ + std::vector<CasDiskIndexEntry> Entries; + std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + if (std::filesystem::is_regular_file(LogPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([this, &Entries, &Timer] { + ZEN_INFO("read store '{}' log containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) { - // Create a new empty index file - BasicFile SidxFile; - SidxFile.Open(SidxPath, true); + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + uint64_t ReadCount = EntryCount - SkipEntryCount; + Entries.reserve(ReadCount); + CasLog.Replay( + [&](const CasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & CasDiskIndexEntry::kTombstone) + { + m_LocationMap.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + return; + } + m_LocationMap[Record.Key] = Record.Location; + }, + SkipEntryCount); + return ReadCount; } + } + return 0; +} - OpenContainer(false /* IsNewStore */); +uint64_t +CasContainerStrategy::MigrateLegacyData(bool CleanSource) +{ + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); - GcCtx.DeletedCas(ChunksToDelete); + if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0) + { + return 0; + } - ZEN_INFO("garbage collect from '{}' DONE, collected #{} {} chunks of total #{} {}", + ZEN_INFO("migrating store '{}'", m_Config.RootDirectory / m_ContainerBaseName); + + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LegacyIndexPath = GetLegacyIndexPath(m_Config.RootDirectory, m_ContainerBaseName); + + uint64_t MigratedChunkCount = 0; + uint32_t MigratedBlockCount = 0; + Stopwatch MigrationTimer; + uint64_t TotalSize = 0; + const auto _ = MakeGuard([this, &MigrationTimer, &MigratedChunkCount, &MigratedBlockCount, &TotalSize] { + ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})", m_Config.RootDirectory / m_ContainerBaseName, - ChunkCount - NewChunkCount, - NiceBytes(TotalSize - NewTotalSize), - ChunkCount, + MigratedChunkCount, + MigratedBlockCount, + NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()), NiceBytes(TotalSize)); + }); + + uint32_t WriteBlockIndex = 0; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; } - catch (std::exception& Err) + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_Config.RootDirectory, Error); + if (Error) { - ZEN_ERROR("garbage collection FAILED, reason '{}'", Err.what()); + ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", m_Config.RootDirectory, Error.message()); + return 0; + } + + if (Space.Free < m_MaxBlockSize) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", + m_Config.RootDirectory / m_ContainerBaseName, + m_MaxBlockSize, + NiceBytes(Space.Free)); + return 0; + } + + BasicFile BlockFile; + BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - // Something went wrong, try create a new container - OpenContainer(true /* IsNewStore */); + std::unordered_map<IoHash, LegacyCasDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex; + uint64_t InvalidEntryCount = 0; - GcCtx.DeletedCas(ChunksToDelete); - GcCtx.DeletedCas(ChunksToKeep); + TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; + LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead); + { + Stopwatch Timer; + const auto __ = MakeGuard([this, &LegacyDiskIndex, &Timer] { + ZEN_INFO("read store '{}' legacy log containing #{} entries in {}", + m_Config.RootDirectory / m_ContainerBaseName, + LegacyDiskIndex.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + if (LegacyCasLog.Initialize()) + { + LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount()); + LegacyCasLog.Replay( + [&](const LegacyCasDiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & LegacyCasDiskIndexEntry::kTombstone) + { + LegacyDiskIndex.erase(Record.Key); + return; + } + if (!ValidateLegacyEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason); + InvalidEntryCount++; + return; + } + LegacyDiskIndex.insert_or_assign(Record.Key, Record); + }, + 0); + + std::vector<IoHash> BadEntries; + uint64_t BlockFileSize = BlockFile.FileSize(); + for (const auto& Entry : LegacyDiskIndex) + { + const LegacyCasDiskIndexEntry& Record(Entry.second); + if (Record.Location.GetOffset() + Record.Location.GetSize() <= BlockFileSize) + { + continue; + } + ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath); + BadEntries.push_back(Entry.first); + } + for (const IoHash& BadHash : BadEntries) + { + LegacyDiskIndex.erase(BadHash); + } + InvalidEntryCount += BadEntries.size(); + } } -} -void -CasContainerStrategy::MakeSnapshot() -{ - RwLock::SharedLockScope _(m_LocationMapLock); + if (InvalidEntryCount) + { + ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_Config.RootDirectory / m_ContainerBaseName); + } - std::vector<CasDiskIndexEntry> Entries{m_LocationMap.size()}; + if (LegacyDiskIndex.empty()) + { + BlockFile.Close(); + LegacyCasLog.Close(); + if (CleanSource) + { + // Older versions of CasContainerStrategy expects the legacy files to exist if it can find + // a CAS manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + BasicFile LegacySidx; + LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); + } + return 0; + } - uint64_t EntryIndex = 0; - for (auto& Entry : m_LocationMap) + for (const auto& Entry : LegacyDiskIndex) { - CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = Entry.second; + const LegacyCasDiskIndexEntry& Record(Entry.second); + TotalSize += Record.Location.GetSize(); } - m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); -} + uint64_t RequiredDiskSpace = TotalSize + ((m_PayloadAlignment - 1) * LegacyDiskIndex.size()); + uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, m_MaxBlockSize) / m_MaxBlockSize; + if (MaxRequiredBlockCount > BlockStoreDiskLocation::MaxBlockIndex) + { + ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", + m_Config.RootDirectory / m_ContainerBaseName, + MaxRequiredBlockCount, + BlockStoreDiskLocation::MaxBlockIndex); + return 0; + } -void -CasContainerStrategy::OpenContainer(bool IsNewStore) -{ - std::filesystem::path SobsPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ucas"); - std::filesystem::path SidxPath = m_Config.RootDirectory / (m_ContainerBaseName + ".uidx"); - std::filesystem::path SlogPath = m_Config.RootDirectory / (m_ContainerBaseName + ".ulog"); + constexpr const uint64_t DiskReserve = 1ul << 28; - m_SmallObjectFile.Open(SobsPath, IsNewStore); - m_SmallObjectIndex.Open(SidxPath, IsNewStore); - m_CasLog.Open(SlogPath, IsNewStore); + if (CleanSource) + { + if (Space.Free < (m_MaxBlockSize + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(m_MaxBlockSize + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } + else + { + if (Space.Free < (RequiredDiskSpace + DiskReserve)) + { + ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", + m_Config.RootDirectory / m_ContainerBaseName, + NiceBytes(RequiredDiskSpace + DiskReserve), + NiceBytes(Space.Free)); + return 0; + } + } - // TODO: should validate integrity of container files here + std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + CreateDirectories(LogPath.parent_path()); + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - m_CurrentInsertOffset = 0; - m_CurrentIndexOffset = 0; - m_TotalSize = 0; + if (CleanSource && (MaxRequiredBlockCount < 2)) + { + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(LegacyDiskIndex.size()); - m_LocationMap.clear(); + // We can use the block as is, just move it and add the blocks to our new log + for (auto& Entry : LegacyDiskIndex) + { + const LegacyCasDiskIndexEntry& Record(Entry.second); - uint64_t MaxFileOffset = 0; + BlockStoreLocation NewChunkLocation(WriteBlockIndex, Record.Location.GetOffset(), Record.Location.GetSize()); + BlockStoreDiskLocation NewLocation(NewChunkLocation, m_PayloadAlignment); + LogEntries.push_back( + {.Key = Entry.second.Key, .Location = NewLocation, .ContentType = Record.ContentType, .Flags = Record.Flags}); + } + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + CreateDirectories(BlockPath.parent_path()); + BlockFile.Close(); + std::filesystem::rename(LegacyDataPath, BlockPath); + CasLog.Append(LogEntries); + for (const CasDiskIndexEntry& Entry : LogEntries) + { + m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); + } - m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { - if (Record.Flags & CasDiskIndexEntry::kTombstone) + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + } + else + { + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(LegacyDiskIndex.size()); + for (const auto& Entry : LegacyDiskIndex) { - m_TotalSize.fetch_sub(Record.Location.GetSize()); + ChunkHashes.push_back(Entry.first); } - else + + std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) { + auto LhsKeyIt = LegacyDiskIndex.find(Lhs); + auto RhsKeyIt = LegacyDiskIndex.find(Rhs); + return LhsKeyIt->second.Location.GetOffset() < RhsKeyIt->second.Location.GetOffset(); + }); + + uint64_t BlockSize = 0; + uint64_t BlockOffset = 0; + std::vector<BlockStoreLocation> NewLocations; + struct BlockData { - m_TotalSize.fetch_add(Record.Location.GetSize()); - m_LocationMap[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.GetOffset() + Record.Location.GetSize()); + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + uint64_t BlockOffset; + uint64_t BlockSize; + uint32_t BlockIndex; + }; + + std::vector<BlockData> BlockRanges; + std::vector<std::pair<IoHash, BlockStoreLocation>> Chunks; + BlockRanges.reserve(MaxRequiredBlockCount); + for (const IoHash& ChunkHash : ChunkHashes) + { + const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[ChunkHash]; + const LegacyCasDiskLocation& LegacyChunkLocation = LegacyEntry.Location; + + uint64_t ChunkOffset = LegacyChunkLocation.GetOffset(); + uint64_t ChunkSize = LegacyChunkLocation.GetSize(); + uint64_t ChunkEnd = ChunkOffset + ChunkSize; + + if (BlockSize == 0) + { + BlockOffset = ChunkOffset; + } + if ((ChunkEnd - BlockOffset) > m_MaxBlockSize) + { + BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; + BlockRange.Chunks.swap(Chunks); + BlockRanges.push_back(BlockRange); + + WriteBlockIndex++; + while (std::filesystem::exists(GetBlockPath(m_BlocksBasePath, WriteBlockIndex))) + { + ++WriteBlockIndex; + } + BlockOffset = ChunkOffset; + BlockSize = 0; + } + BlockSize = RoundUp(BlockSize, m_PayloadAlignment); + BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; + Chunks.push_back({ChunkHash, ChunkLocation}); + BlockSize = ChunkEnd - BlockOffset; } - }); + if (BlockSize > 0) + { + BlockRanges.push_back( + {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); + } + Stopwatch WriteBlockTimer; + + std::reverse(BlockRanges.begin(), BlockRanges.end()); + std::vector<std::uint8_t> Buffer(1 << 28); + for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) + { + const BlockData& BlockRange = BlockRanges[Idx]; + if (Idx > 0) + { + uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; + uint64_t Completed = BlockOffset + BlockSize - Remaining; + uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; + + ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", + m_Config.RootDirectory / m_ContainerBaseName, + Idx, + BlockRanges.size(), + NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), + NiceBytes(BlockOffset + BlockSize), + NiceTimeSpanMs(ETA)); + } + + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockRange.BlockIndex); + BlockStoreFile ChunkBlock(BlockPath); + ChunkBlock.Create(BlockRange.BlockSize); + uint64_t Offset = 0; + while (Offset < BlockRange.BlockSize) + { + uint64_t Size = BlockRange.BlockSize - Offset; + if (Size > Buffer.size()) + { + Size = Buffer.size(); + } + BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); + ChunkBlock.Write(Buffer.data(), Size, Offset); + Offset += Size; + } + ChunkBlock.Truncate(Offset); + ChunkBlock.Flush(); + + std::vector<CasDiskIndexEntry> LogEntries; + LogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + const LegacyCasDiskIndexEntry& LegacyEntry = LegacyDiskIndex[Entry.first]; + BlockStoreDiskLocation Location(Entry.second, m_PayloadAlignment); + LogEntries.push_back( + {.Key = Entry.first, .Location = Location, .ContentType = LegacyEntry.ContentType, .Flags = LegacyEntry.Flags}); + } + CasLog.Append(LogEntries); + for (const CasDiskIndexEntry& Entry : LogEntries) + { + m_LocationMap.insert_or_assign(Entry.Key, Entry.Location); + } + MigratedChunkCount += LogEntries.size(); + MigratedBlockCount++; + + if (CleanSource) + { + std::vector<LegacyCasDiskIndexEntry> LegacyLogEntries; + LegacyLogEntries.reserve(BlockRange.Chunks.size()); + for (const auto& Entry : BlockRange.Chunks) + { + LegacyLogEntries.push_back({.Key = Entry.first, .Flags = LegacyCasDiskIndexEntry::kTombstone}); + } + LegacyCasLog.Append(LegacyLogEntries); + BlockFile.SetFileSize(BlockRange.BlockOffset); + } + } + } - m_CurrentInsertOffset = (MaxFileOffset + m_PayloadAlignment - 1) & ~(m_PayloadAlignment - 1); - m_CurrentIndexOffset = m_SmallObjectIndex.FileSize(); + BlockFile.Close(); + LegacyCasLog.Close(); + CasLog.Close(); + + if (CleanSource) + { + // Older versions of CasContainerStrategy expects the legacy files to exist if it can find + // a CAS manifest and crashes on startup if they don't. + // In order to not break startup when switching back an older version, lets just reset + // the legacy data files to zero length. + + BasicFile LegacyLog; + LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate); + BasicFile LegacySobs; + LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate); + BasicFile LegacySidx; + LegacySidx.Open(LegacyIndexPath, BasicFile::Mode::kTruncate); + } + return MigratedChunkCount; } void -CasContainerStrategy::CloseContainer() +CasContainerStrategy::OpenContainer(bool IsNewStore) { - m_SmallObjectFile.Close(); - m_SmallObjectIndex.Close(); - m_CasLog.Close(); + // Add .running file and delete on clean on close to detect bad termination + m_TotalSize = 0; + + m_LocationMap.clear(); + + std::filesystem::path BasePath = GetBasePath(m_Config.RootDirectory, m_ContainerBaseName); + + if (IsNewStore) + { + std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_Config.RootDirectory, m_ContainerBaseName); + std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_Config.RootDirectory, m_ContainerBaseName); + + std::filesystem::remove(LegacyLogPath); + std::filesystem::remove(LegacyDataPath); + std::filesystem::remove_all(BasePath); + } + + uint64_t LogPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(LogPosition); + uint64_t LegacyLogEntryCount = MigrateLegacyData(true); + + CreateDirectories(BasePath); + + std::filesystem::path LogPath = GetLogPath(m_Config.RootDirectory, m_ContainerBaseName); + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : m_LocationMap) + { + const BlockStoreDiskLocation& Location = Entry.second; + m_TotalSize.fetch_add(Location.GetSize(), std::memory_order_release); + KnownBlocks.insert(Location.GetBlockIndex()); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != DataExtension) + { + continue; + } + std::string FileName = Path.stem().string(); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_INFO("removing unused block for '{}' at '{}'", m_ContainerBaseName, Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); + BlockFile->Open(); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } + + if (IsNewStore || ((LogEntryCount + LegacyLogEntryCount) > 0)) + { + MakeIndexSnapshot(); + } + + // TODO: should validate integrity of container files here } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS -TEST_CASE("cas.compact.gc") +namespace { + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace + +TEST_CASE("compactcas.hex") +{ + uint32_t Value; + std::string HexString; + CHECK(!ParseHexNumber("", Value)); + char Hex[9]; + + ToHexNumber(0u, Hex); + HexString = std::string(Hex); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0u); + + ToHexNumber(std::numeric_limits<std::uint32_t>::max(), Hex); + HexString = std::string(Hex); + CHECK(HexString == "ffffffff"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == std::numeric_limits<std::uint32_t>::max()); + + ToHexNumber(0xadf14711u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "adf14711"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0xadf14711u); + + ToHexNumber(0x80000000u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "80000000"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x80000000u); + + ToHexNumber(0x718293a4u, Hex); + HexString = std::string(Hex); + CHECK(HexString == "718293a4"); + CHECK(ParseHexNumber(HexString, Value)); + CHECK(Value == 0x718293a4u); +} + +TEST_CASE("compactcas.compact.gc") { ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); - CreateDirectories(CasConfig.RootDirectory); const int kIterationCount = 1000; @@ -499,7 +1624,7 @@ TEST_CASE("cas.compact.gc") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, true); + Cas.Initialize("test", 65536, 16, true); for (int i = 0; i < kIterationCount; ++i) { @@ -533,7 +1658,7 @@ TEST_CASE("cas.compact.gc") { CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, false); + Cas.Initialize("test", 65536, 16, false); for (int i = 0; i < kIterationCount; ++i) { @@ -545,67 +1670,843 @@ TEST_CASE("cas.compact.gc") CHECK_EQ(Value["id"].AsInt32(), i); } - - GcContext Ctx; - Cas.CollectGarbage(Ctx); } } -TEST_CASE("cas.compact.totalsize") +TEST_CASE("compactcas.compact.totalsize") { std::random_device rd; std::mt19937 g(rd()); - const auto CreateChunk = [&](uint64_t Size) -> IoBuffer { - const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t)); - std::vector<uint32_t> Values; - Values.resize(Count); - for (size_t Idx = 0; Idx < Count; ++Idx) + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1024; + const int32_t kChunkCount = 16; + { - Values[Idx] = static_cast<uint32_t>(Idx); + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, true); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + const IoHash Hash = HashBuffer(Chunk); + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + } + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); } - std::shuffle(Values.begin(), Values.end(), g); - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); - }; + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, false); + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + + // Re-open again, this time we should have a snapshot + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 65536, 16, false); + + const uint64_t TotalSize = Cas.StorageSize().DiskSize; + CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + } + } +} + +TEST_CASE("compactcas.gc.basic") +{ ScopedTemporaryDirectory TempDir; CasStoreConfiguration CasConfig; CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 65536, 1 << 4, true); + + IoBuffer Chunk = CreateChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + Cas.Flush(); + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.removefile") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); CreateDirectories(CasConfig.RootDirectory); - const uint64_t kChunkSize = 1024; - const int32_t kChunkCount = 16; + IoBuffer Chunk = CreateChunk(128); + IoHash ChunkHash = IoHash::HashBuffer(Chunk); + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 65536, 1 << 4, true); + + const CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(InsertResult.New); + const CasStore::InsertResult InsertResultDup = Cas.InsertChunk(Chunk, ChunkHash); + CHECK(!InsertResultDup.New); + Cas.Flush(); + } + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("cb", 65536, 1 << 4, false); + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHash)); +} + +TEST_CASE("compactcas.gc.compact") +{ + // for (uint32_t i = 0; i < 100; ++i) { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, true); + Cas.Initialize("cb", 2048, 1 << 4, true); - for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(9); + for (uint64_t Size : ChunkSizes) { - IoBuffer Chunk = CreateChunk(kChunkSize); - const IoHash Hash = HashBuffer(Chunk); - auto InsertResult = Cas.InsertChunk(Chunk, Hash); - ZEN_ASSERT(InsertResult.New); + Chunks.push_back(CreateChunk(Size)); } - const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(9); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CHECK(Cas.InsertChunk(Chunks[0], ChunkHashes[0]).New); + CHECK(Cas.InsertChunk(Chunks[1], ChunkHashes[1]).New); + CHECK(Cas.InsertChunk(Chunks[2], ChunkHashes[2]).New); + CHECK(Cas.InsertChunk(Chunks[3], ChunkHashes[3]).New); + CHECK(Cas.InsertChunk(Chunks[4], ChunkHashes[4]).New); + CHECK(Cas.InsertChunk(Chunks[5], ChunkHashes[5]).New); + CHECK(Cas.InsertChunk(Chunks[6], ChunkHashes[6]).New); + CHECK(Cas.InsertChunk(Chunks[7], ChunkHashes[7]).New); + CHECK(Cas.InsertChunk(Chunks[8], ChunkHashes[8]).New); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + uint64_t InitialSize = Cas.StorageSize().DiskSize; + + // Keep first and last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + } + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + + // Keep last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(!Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(!Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(!Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(!Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(!Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[0], ChunkHashes[0]); + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Keep every other + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[2]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + CHECK(Cas.HaveChunk(ChunkHashes[0])); + CHECK(!Cas.HaveChunk(ChunkHashes[1])); + CHECK(Cas.HaveChunk(ChunkHashes[2])); + CHECK(!Cas.HaveChunk(ChunkHashes[3])); + CHECK(Cas.HaveChunk(ChunkHashes[4])); + CHECK(!Cas.HaveChunk(ChunkHashes[5])); + CHECK(Cas.HaveChunk(ChunkHashes[6])); + CHECK(!Cas.HaveChunk(ChunkHashes[7])); + CHECK(Cas.HaveChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + + uint64_t FinalSize = Cas.StorageSize().DiskSize; + CHECK(InitialSize == FinalSize); } +} + +TEST_CASE("compactcas.gc.deleteblockonopen") +{ + ScopedTemporaryDirectory TempDir; + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (uint64_t Size : ChunkSizes) { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + // GC every other block + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < 20; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcCtx.ContributeCas(KeepChunks); + + Cas.Flush(); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } + } + { + // Re-open + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, false); + + for (size_t i = 0; i < 20; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("compactcas.gc.handleopeniobuffer") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[20] = {128, 541, 311, 181, 218, 37, 4, 397, 5, 92, 551, 721, 31, 92, 16, 99, 131, 41, 541, 84}; + std::vector<IoBuffer> Chunks; + Chunks.reserve(20); + for (const uint64_t& Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(20); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 1024, 16, true); + + for (size_t i = 0; i < 20; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + IoBuffer RetainChunk = Cas.FindChunk(ChunkHashes[5]); + Cas.Flush(); + + // GC everything + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + Cas.CollectGarbage(GcCtx); + + for (size_t i = 0; i < 20; i++) + { + CHECK(!Cas.HaveChunk(ChunkHashes[i])); + } + + CHECK(ChunkHashes[5] == IoHash::HashBuffer(RetainChunk)); +} + +TEST_CASE("compactcas.legacyconversion") +{ + ScopedTemporaryDirectory TempDir; + + uint64_t ChunkSizes[] = {2041, 1123, 1223, 1239, 341, 1412, 912, 774, 341, 431, 554, 1098, 2048, 339, 561, 16, 16, 2048, 2048}; + size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t); + size_t SingleBlockSize = 0; + std::vector<IoBuffer> Chunks; + Chunks.reserve(ChunkCount); + for (uint64_t Size : ChunkSizes) + { + Chunks.push_back(CreateChunk(Size)); + SingleBlockSize += Size; + } + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(ChunkCount); + for (const IoBuffer& Chunk : Chunks) + { + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + CreateDirectories(CasConfig.RootDirectory); + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", gsl::narrow<uint32_t>(SingleBlockSize * 2), 16, true); + + for (size_t i = 0; i < ChunkCount; i++) + { + CHECK(Cas.InsertChunk(Chunks[i], ChunkHashes[i]).New); + } + + std::vector<IoHash> KeepChunks; + for (size_t i = 0; i < ChunkCount; i += 2) + { + KeepChunks.push_back(ChunkHashes[i]); + } + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepChunks); + Cas.Flush(); + Gc.CollectGarbage(GcCtx); + } + + std::filesystem::path BlockPath = GetBlockPath(GetBlocksBasePath(CasConfig.RootDirectory, "test"), 1); + std::filesystem::path LegacyDataPath = GetLegacyDataPath(CasConfig.RootDirectory, "test"); + std::filesystem::rename(BlockPath, LegacyDataPath); + + std::vector<CasDiskIndexEntry> LogEntries; + std::filesystem::path IndexPath = GetIndexPath(CasConfig.RootDirectory, "test"); + if (std::filesystem::is_regular_file(IndexPath)) + { + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(CasDiskIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CasDiskIndexHeader))) / sizeof(CasDiskIndexEntry); + CasDiskIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if (Header.Magic == CasDiskIndexHeader::ExpectedMagic && Header.Version == CasDiskIndexHeader::CurrentVersion && + Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount) + { + LogEntries.resize(Header.EntryCount); + ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + } + } + ObjectIndexFile.Close(); + std::filesystem::remove(IndexPath); + } + + std::filesystem::path LogPath = GetLogPath(CasConfig.RootDirectory, "test"); + { + TCasLogFile<CasDiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + LogEntries.reserve(CasLog.GetLogCount()); + CasLog.Replay([&](const CasDiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0); + } + TCasLogFile<LegacyCasDiskIndexEntry> LegacyCasLog; + std::filesystem::path LegacylogPath = GetLegacyLogPath(CasConfig.RootDirectory, "test"); + LegacyCasLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate); + + for (const CasDiskIndexEntry& Entry : LogEntries) + { + BlockStoreLocation Location = Entry.Location.Get(16); + LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size); + LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key, + .Location = LegacyLocation, + .ContentType = Entry.ContentType, + .Flags = Entry.Flags}; + LegacyCasLog.Append(LegacyEntry); + } + LegacyCasLog.Close(); + + std::filesystem::remove_all(CasConfig.RootDirectory / "test"); + + { + CasGc Gc; + CasContainerStrategy Cas(CasConfig, Gc); + Cas.Initialize("test", 2048, 16, false); + + for (size_t i = 0; i < ChunkCount; i += 2) + { + CHECK(Cas.HaveChunk(ChunkHashes[i])); + CHECK(!Cas.HaveChunk(ChunkHashes[i + 1])); + CHECK(ChunkHashes[i] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[i]))); + } + } +} + +TEST_CASE("compactcas.threadedinsert") // * doctest::skip(true)) +{ + // for (uint32_t i = 0; i < 100; ++i) + { + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path(); + + CreateDirectories(CasConfig.RootDirectory); + + const uint64_t kChunkSize = 1048; + const int32_t kChunkCount = 8192; + + std::vector<IoHash> ChunkHashes; + ChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> Chunks; + Chunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + ChunkHashes.emplace_back(Hash); + Chunks.emplace_back(Chunk); + } + + WorkerThreadPool ThreadPool(4); CasGc Gc; CasContainerStrategy Cas(CasConfig, Gc); - Cas.Initialize("test", 16, false); + Cas.Initialize("test", 32768, 16, true); + { + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = Chunks[Idx]; + const IoHash& Hash = ChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } const uint64_t TotalSize = Cas.StorageSize().DiskSize; CHECK_EQ(kChunkSize * kChunkCount, TotalSize); + + { + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + ThreadPool.ScheduleWork([&Cas, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(ChunkHash); + IoHash Hash = IoHash::HashBuffer(Chunk); + CHECK(ChunkHash == Hash); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } + + std::unordered_set<IoHash, IoHash::Hasher> GcChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + { + std::vector<IoHash> OldChunkHashes(ChunkHashes.begin(), ChunkHashes.end()); + std::vector<IoHash> NewChunkHashes; + NewChunkHashes.reserve(kChunkCount); + std::vector<IoBuffer> NewChunks; + NewChunks.reserve(kChunkCount); + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + IoBuffer Chunk = CreateChunk(kChunkSize); + IoHash Hash = HashBuffer(Chunk); + NewChunkHashes.emplace_back(Hash); + NewChunks.emplace_back(Chunk); + } + + RwLock ChunkHashesLock; + std::atomic_uint32_t AddedChunkCount; + + for (int32_t Idx = 0; Idx < kChunkCount; ++Idx) + { + const IoBuffer& Chunk = NewChunks[Idx]; + const IoHash& Hash = NewChunkHashes[Idx]; + ThreadPool.ScheduleWork([&Cas, Chunk, Hash, &AddedChunkCount]() { + CasStore::InsertResult InsertResult = Cas.InsertChunk(Chunk, Hash); + ZEN_ASSERT(InsertResult.New); + AddedChunkCount.fetch_add(1); + }); + ThreadPool.ScheduleWork([&Cas, &ChunkHashesLock, &OldChunkHashes, Idx]() { + IoHash ChunkHash = OldChunkHashes[Idx]; + IoBuffer Chunk = Cas.FindChunk(OldChunkHashes[Idx]); + if (Chunk) + { + CHECK(ChunkHash == IoHash::HashBuffer(Chunk)); + } + }); + } + + while (AddedChunkCount.load() < kChunkCount) + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 155 == 0) + { + if (C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + if (C + 3 < KeepHashes.size() - 1) + { + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + + { + std::vector<IoHash> AddedHashes; + { + RwLock::ExclusiveLockScope _(ChunkHashesLock); + AddedHashes.swap(NewChunkHashes); + } + // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope + for (const IoHash& ChunkHash : AddedHashes) + { + if (Cas.HaveChunk(ChunkHash)) + { + GcChunkHashes.emplace(ChunkHash); + } + } + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) + { + if (C % 77 == 0 && C < KeepHashes.size() - 1) + { + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); + } + C++; + } + + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + GcCtx.ContributeCas(KeepHashes); + Cas.CollectGarbage(GcCtx); + CasChunkSet& Deleted = GcCtx.DeletedCas(); + Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + } + } + { + for (const IoHash& ChunkHash : GcChunkHashes) + { + ThreadPool.ScheduleWork([&Cas, ChunkHash]() { + CHECK(Cas.HaveChunk(ChunkHash)); + CHECK(ChunkHash == IoHash::HashBuffer(Cas.FindChunk(ChunkHash))); + }); + } + while (ThreadPool.PendingWork() > 0) + { + Sleep(1); + } + } } } +TEST_CASE("compactcas.migrate.large.data" * doctest::skip(true)) +{ + const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas"; + std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs"); + std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); + std::filesystem::remove_all(TobsBasePath); + std::filesystem::remove_all(SobsBasePath); + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = BigDataPath; + uint64_t TObsSize = 0; + { + CasGc TobsCasGc; + CasContainerStrategy TobsCas(CasConfig, TobsCasGc); + TobsCas.Initialize("tobs", 1u << 28, 16, false); + TObsSize = TobsCas.StorageSize().DiskSize; + CHECK(TObsSize > 0); + } + + uint64_t SObsSize = 0; + { + CasGc SobsCasGc; + CasContainerStrategy SobsCas(CasConfig, SobsCasGc); + SobsCas.Initialize("sobs", 1u << 30, 4096, false); + SObsSize = SobsCas.StorageSize().DiskSize; + CHECK(SObsSize > 0); + } + + CasGc TobsCasGc; + CasContainerStrategy TobsCas(CasConfig, TobsCasGc); + TobsCas.Initialize("tobs", 1u << 28, 16, false); + GcContext TobsGcCtx; + TobsCas.CollectGarbage(TobsGcCtx); + CHECK(TobsCas.StorageSize().DiskSize == TObsSize); + + CasGc SobsCasGc; + CasContainerStrategy SobsCas(CasConfig, SobsCasGc); + SobsCas.Initialize("sobs", 1u << 30, 4096, false); + GcContext SobsGcCtx; + SobsCas.CollectGarbage(SobsGcCtx); + CHECK(SobsCas.StorageSize().DiskSize == SObsSize); +} + #endif void diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index c039feec9..11da37202 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -3,22 +3,13 @@ #pragma once #include <zencore/zencore.h> - -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/string.h> -#include <zencore/thread.h> -#include <zencore/uid.h> -#include <zenstore/basicfile.h> +#include <zenstore/blockstore.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> #include <zenstore/gc.h> -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#endif - #include <atomic> +#include <limits> #include <unordered_map> namespace spdlog { @@ -32,46 +23,14 @@ namespace zen { #pragma pack(push) #pragma pack(1) -struct CasDiskLocation -{ - CasDiskLocation(uint64_t InOffset, uint64_t InSize) - { - ZEN_ASSERT(InOffset <= 0xff'ffff'ffff); - ZEN_ASSERT(InSize <= 0xff'ffff'ffff); - - memcpy(&m_Offset[0], &InOffset, sizeof m_Offset); - memcpy(&m_Size[0], &InSize, sizeof m_Size); - } - - CasDiskLocation() = default; - - inline uint64_t GetOffset() const - { - uint64_t Offset = 0; - memcpy(&Offset, &m_Offset, sizeof m_Offset); - return Offset; - } - - inline uint64_t GetSize() const - { - uint64_t Size = 0; - memcpy(&Size, &m_Size, sizeof m_Size); - return Size; - } - -private: - uint8_t m_Offset[5]; - uint8_t m_Size[5]; -}; - struct CasDiskIndexEntry { static const uint8_t kTombstone = 0x01; - IoHash Key; - CasDiskLocation Location; - ZenContentType ContentType = ZenContentType::kUnknownContentType; - uint8_t Flags = 0; + IoHash Key; + BlockStoreDiskLocation Location; + ZenContentType ContentType = ZenContentType::kUnknownContentType; + uint8_t Flags = 0; }; #pragma pack(pop) @@ -91,39 +50,46 @@ struct CasContainerStrategy final : public GcStorage CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc); ~CasContainerStrategy(); - CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(CasChunkSet& InOutChunks); - void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); + void Initialize(const std::string_view ContainerBaseName, uint32_t MaxBlockSize, uint64_t Alignment, bool IsNewStore); void Flush(); void Scrub(ScrubContext& Ctx); virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } + virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::acquire)}; } private: - void OpenContainer(bool IsNewStore); - void CloseContainer(); + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t SkipEntryCount); + uint64_t MigrateLegacyData(bool CleanSource); + void OpenContainer(bool IsNewStore); + spdlog::logger& Log() { return m_Log; } const CasStoreConfiguration& m_Config; spdlog::logger& m_Log; - uint64_t m_PayloadAlignment = 1 << 4; + uint64_t m_PayloadAlignment = 1u << 4; + uint64_t m_MaxBlockSize = 1u << 28; bool m_IsInitialized = false; - BasicFile m_SmallObjectFile; - BasicFile m_SmallObjectIndex; TCasLogFile<CasDiskIndexEntry> m_CasLog; std::string m_ContainerBaseName; + std::filesystem::path m_BlocksBasePath; + + RwLock m_LocationMapLock; + typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t; + LocationMap_t m_LocationMap; + std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; - RwLock m_LocationMapLock; - std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap; - RwLock m_InsertLock; // used to serialize inserts - std::atomic_uint64_t m_CurrentInsertOffset{}; - std::atomic_uint64_t m_CurrentIndexOffset{}; - std::atomic_uint64_t m_TotalSize{}; + RwLock m_InsertLock; // used to serialize inserts + Ref<BlockStoreFile> m_WriteBlock; + std::uint64_t m_CurrentInsertOffset = 0; - void MakeSnapshot(); + std::atomic_uint32_t m_WriteBlockIndex{}; + std::atomic_uint64_t m_TotalSize{}; }; void compactcas_forcelink(); diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 758c0665b..b53cfaa54 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -12,6 +12,7 @@ #include <zencore/testing.h> #include <zencore/testutils.h> #include <zencore/thread.h> +#include <zencore/timer.h> #include <zencore/uid.h> #include <zenstore/basicfile.h> #include <zenstore/gc.h> @@ -88,18 +89,37 @@ FileCasStrategy::Initialize(bool IsNewStore) CreateDirectories(m_Config.RootDirectory); - m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore); + m_CasLog.Open(m_Config.RootDirectory / "cas.ulog", IsNewStore ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite); - m_CasLog.Replay([&](const FileCasIndexEntry& Entry) { - if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) - { - m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); - } - else - { - m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); - } + Stopwatch Timer; + const auto _ = MakeGuard([this, &Timer] { + ZEN_INFO("read log {} containing {}", m_Config.RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); }); + + std::unordered_set<IoHash> FoundEntries; + FoundEntries.reserve(10000); + m_CasLog.Replay( + [&](const FileCasIndexEntry& Entry) { + if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) + { + if (!FoundEntries.contains(Entry.Key)) + { + return; + } + m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); + FoundEntries.erase(Entry.Key); + } + else + { + if (FoundEntries.contains(Entry.Key)) + { + return; + } + FoundEntries.insert(Entry.Key); + m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); + } + }, + 0); } CasStore::InsertResult @@ -565,7 +585,7 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& BasicFile PayloadFile; std::error_code Ec; - PayloadFile.Open(Parent / File, false, Ec); + PayloadFile.Open(Parent / File, BasicFile::Mode::kWrite, Ec); if (!Ec) { @@ -668,6 +688,20 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) std::vector<IoHash> CandidateCas; + uint64_t DeletedCount = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + Stopwatch TotalTimer; + const auto _ = MakeGuard([this, &TotalTimer, &DeletedCount, &ChunkCount, OldTotalSize] { + ZEN_INFO("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", + m_Config.RootDirectory, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + DeletedCount, + ChunkCount, + NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)), + NiceBytes(OldTotalSize)); + }); + IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { bool KeepThis = false; CandidateCas.clear(); @@ -689,16 +723,17 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) ChunkBytes.fetch_add(FileSize); }); - ZEN_INFO("file CAS gc scanned: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); - if (ChunksToDelete.empty()) { - ZEN_INFO("nothing to delete"); - + ZEN_INFO("gc for '{}' SKIPPED, nothing to delete", m_Config.RootDirectory); return; } - ZEN_INFO("deleting file CAS garbage: {} chunks ({})", ChunksToDelete.size(), NiceBytes(ChunksToDeleteBytes)); + ZEN_INFO("deleting file CAS garbage for '{}': {} out of {} chunks ({})", + m_Config.RootDirectory, + ChunksToDelete.size(), + ChunkCount.load(), + NiceBytes(ChunksToDeleteBytes)); if (GcCtx.IsDeletionMode() == false) { @@ -716,8 +751,10 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) if (Ec) { - ZEN_WARN("failed to delete file for chunk {}: '{}'", Hash, Ec.message()); + ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_Config.RootDirectory, Hash, Ec.message()); + continue; } + DeletedCount++; } GcCtx.DeletedCas(ChunksToDelete); @@ -747,7 +784,7 @@ TEST_CASE("cas.file.move") IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes); BasicFile PayloadFile; - PayloadFile.Open(Payload1Path, true); + PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate); PayloadFile.Write(ZeroBytes, 0); PayloadFile.Close(); diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 3b090cae9..287dfb48a 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -5,9 +5,11 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/testing.h> #include <zencore/testutils.h> @@ -18,6 +20,15 @@ #include <fmt/format.h> #include <filesystem> +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +#else +# include <fcntl.h> +# include <sys/file.h> +# include <sys/stat.h> +# include <unistd.h> +#endif + #if ZEN_WITH_TESTS # include <zencore/compress.h> # include <algorithm> @@ -31,6 +42,112 @@ namespace fs = std::filesystem; ////////////////////////////////////////////////////////////////////////// +namespace { + std::error_code CreateGCReserve(const std::filesystem::path& Path, uint64_t Size) + { + if (Size == 0) + { + std::filesystem::remove(Path); + return std::error_code{}; + } + CreateDirectories(Path.parent_path()); + if (std::filesystem::is_regular_file(Path) && std::filesystem::file_size(Path) == Size) + { + return std::error_code(); + } +#if ZEN_PLATFORM_WINDOWS + DWORD dwCreationDisposition = CREATE_ALWAYS; + DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; + + const DWORD dwShareMode = 0; + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; + HANDLE hTemplateFile = nullptr; + + HANDLE FileHandle = CreateFile(Path.c_str(), + dwDesiredAccess, + dwShareMode, + /* lpSecurityAttributes */ nullptr, + dwCreationDisposition, + dwFlagsAndAttributes, + hTemplateFile); + + if (FileHandle == INVALID_HANDLE_VALUE) + { + return MakeErrorCodeFromLastError(); + } + bool Keep = true; + auto _ = MakeGuard([FileHandle, &Keep, Path]() { + ::CloseHandle(FileHandle); + if (!Keep) + { + ::DeleteFile(Path.c_str()); + } + }); + LARGE_INTEGER liFileSize; + liFileSize.QuadPart = Size; + BOOL OK = ::SetFilePointerEx(FileHandle, liFileSize, 0, FILE_BEGIN); + if (!OK) + { + return MakeErrorCodeFromLastError(); + } + OK = ::SetEndOfFile(FileHandle); + if (!OK) + { + return MakeErrorCodeFromLastError(); + } + Keep = true; +#else + int OpenFlags = O_CLOEXEC | O_RDWR | O_CREAT; + int Fd = open(Path.c_str(), OpenFlags, 0666); + if (Fd < 0) + { + return MakeErrorCodeFromLastError(); + } + + bool Keep = true; + auto _ = MakeGuard([Fd, &Keep, Path]() { + close(Fd); + if (!Keep) + { + unlink(Path.c_str()); + } + }); + + if (fchmod(Fd, 0666) < 0) + { + return MakeErrorCodeFromLastError(); + } + +# if ZEN_PLATFORM_MAC + if (ftruncate(Fd, (off_t)Size) < 0) + { + return MakeErrorCodeFromLastError(); + } + int Error = posix_fallocate(Fd, 0, (off_t)Size); + if (Error) + { + return MakeErrorCode(Error); + } +# else + if (ftruncate64(Fd, (off64_t)Size) < 0) + { + return MakeErrorCodeFromLastError(); + } + int Error = posix_fallocate64(Fd, 0, (off64_t)Size); + if (Error) + { + return MakeErrorCode(Error); + } +# endif + Keep = true; +#endif + return std::error_code{}; + } + +} // namespace + +////////////////////////////////////////////////////////////////////////// + CbObject LoadCompactBinaryObject(const fs::path& Path) { @@ -74,6 +191,8 @@ struct GcContext::GcState GcClock::Duration m_MaxCacheDuration = std::chrono::hours(24); bool m_DeletionMode = true; bool m_CollectSmallObjects = false; + + std::filesystem::path DiskReservePath; }; GcContext::GcContext(GcClock::TimePoint Time) : m_State(std::make_unique<GcState>()) @@ -194,6 +313,27 @@ GcContext::MaxCacheDuration(GcClock::Duration Duration) m_State->m_MaxCacheDuration = Duration; } +void +GcContext::DiskReservePath(const std::filesystem::path& Path) +{ + m_State->DiskReservePath = Path; +} + +uint64_t +GcContext::ClaimGCReserve() +{ + if (!std::filesystem::is_regular_file(m_State->DiskReservePath)) + { + return 0; + } + uint64_t ReclaimedSize = std::filesystem::file_size(m_State->DiskReservePath); + if (std::filesystem::remove(m_State->DiskReservePath)) + { + return ReclaimedSize; + } + return 0; +} + ////////////////////////////////////////////////////////////////////////// GcContributor::GcContributor(CasGc& Gc) : m_Gc(Gc) @@ -262,10 +402,13 @@ CasGc::CollectGarbage(GcContext& GcCtx) RwLock::SharedLockScope _(m_Lock); // First gather reference set - - for (GcContributor* Contributor : m_GcContribs) { - Contributor->GatherReferences(GcCtx); + Stopwatch Timer; + const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("gathered references in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + for (GcContributor* Contributor : m_GcContribs) + { + Contributor->GatherReferences(GcCtx); + } } // Cache records reference CAS chunks with the uncompressed @@ -300,15 +443,22 @@ CasGc::CollectGarbage(GcContext& GcCtx) // Then trim storage - for (GcStorage* Storage : m_GcStorage) { - Storage->CollectGarbage(GcCtx); + Stopwatch Timer; + const auto Guard = MakeGuard([this, &Timer] { ZEN_INFO("collected garbage in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + for (GcStorage* Storage : m_GcStorage) + { + Storage->CollectGarbage(GcCtx); + } } // Remove Cid to CAS hash mappings. Scrub? if (CidStore* CidStore = m_CidStore) { + Stopwatch Timer; + const auto Guard = + MakeGuard([this, &Timer] { ZEN_INFO("clean up deleted content ids in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); CidStore->RemoveCids(GcCtx.DeletedCas()); } } @@ -379,6 +529,15 @@ GcScheduler::Initialize(const GcSchedulerConfig& Config) std::filesystem::create_directories(Config.RootDirectory); + std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); + if (Ec) + { + ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason '{}'", + m_Config.RootDirectory / "reserve.gc", + NiceBytes(m_Config.DiskReserveSize), + Ec.message()); + } + m_LastGcTime = GcClock::Now(); if (CbObject SchedulerState = LoadCompactBinaryObject(Config.RootDirectory / "gc_state")) @@ -475,7 +634,7 @@ GcScheduler::SchedulerThread() if (Ec) { - ZEN_WARN("get disk space info FAILED, reason '{}'", Ec.message()); + ZEN_WARN("get disk space info FAILED, reason: '{}'", Ec.message()); } ZEN_INFO("{} in use, {} of total {} free disk space, {}", @@ -506,6 +665,7 @@ GcScheduler::SchedulerThread() GcCtx.SetDeletionMode(true); GcCtx.CollectSmallObjects(m_Config.CollectSmallObjects); GcCtx.MaxCacheDuration(m_Config.MaxCacheDuration); + GcCtx.DiskReservePath(m_Config.RootDirectory / "reserve.gc"); if (m_TriggerParams) { @@ -519,27 +679,37 @@ GcScheduler::SchedulerThread() } } - Stopwatch Timer; - ZEN_INFO("garbage collection STARTING, small objects gc {}, max cache duration {}", GcCtx.CollectSmallObjects() ? "ENABLED"sv : "DISABLED"sv, NiceTimeSpanMs(uint64_t(std::chrono::duration_cast<std::chrono::milliseconds>(GcCtx.MaxCacheDuration()).count()))); + { + Stopwatch Timer; + const auto __ = + MakeGuard([this, &Timer] { ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_CasGc.CollectGarbage(GcCtx); + m_CasGc.CollectGarbage(GcCtx); - m_LastGcTime = GcClock::Now(); - m_NextGcTime = NextGcTime(m_LastGcTime); - WaitTime = m_Config.MonitorInterval; + m_LastGcTime = GcClock::Now(); + m_NextGcTime = NextGcTime(m_LastGcTime); + WaitTime = m_Config.MonitorInterval; - { - const fs::path Path = m_Config.RootDirectory / "gc_state"; - ZEN_DEBUG("saving scheduler state to '{}'", Path); - CbObjectWriter SchedulderState; - SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count()); - SaveCompactBinaryObject(Path, SchedulderState.Save()); - } + { + const fs::path Path = m_Config.RootDirectory / "gc_state"; + ZEN_DEBUG("saving scheduler state to '{}'", Path); + CbObjectWriter SchedulderState; + SchedulderState << "LastGcTime"sv << static_cast<int64_t>(m_LastGcTime.time_since_epoch().count()); + SaveCompactBinaryObject(Path, SchedulderState.Save()); + } - ZEN_INFO("garbage collection DONE after {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + std::error_code Ec = CreateGCReserve(m_Config.RootDirectory / "reserve.gc", m_Config.DiskReserveSize); + if (Ec) + { + ZEN_WARN("unable to create GC reserve at '{}' with size {}, reason: '{}'", + m_Config.RootDirectory / "reserve.gc", + NiceBytes(m_Config.DiskReserveSize), + Ec.message()); + } + } uint32_t RunningState = static_cast<uint32_t>(GcSchedulerStatus::kRunning); if (!m_Status.compare_exchange_strong(RunningState, static_cast<uint32_t>(GcSchedulerStatus::kIdle))) @@ -573,16 +743,15 @@ namespace { static std::random_device rd; static std::mt19937 g(rd()); - const size_t Count = static_cast<size_t>(Size / sizeof(uint32_t)); - std::vector<uint32_t> Values; - Values.resize(Count); - for (size_t Idx = 0; Idx < Count; ++Idx) + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) { - Values[Idx] = static_cast<uint32_t>(Idx); + Values[Idx] = static_cast<uint8_t>(Idx); } std::shuffle(Values.begin(), Values.end(), g); - return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size() * sizeof(uint32_t)); + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); } static CompressedBuffer Compress(IoBuffer Buffer) @@ -613,11 +782,209 @@ TEST_CASE("gc.basic") GcContext GcCtx; GcCtx.CollectSmallObjects(true); + CasStore->Flush(); Gc.CollectGarbage(GcCtx); CHECK(!CidStore.ContainsChunk(InsertResult.DecompressedId)); } +TEST_CASE("gc.full") +{ + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration CasConfig; + CasConfig.RootDirectory = TempDir.Path() / "cas"; + + CasGc Gc; + std::unique_ptr<CasStore> CasStore = CreateCasStore(Gc); + + CasStore->Initialize(CasConfig); + + uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5}; + IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]), + CreateChunk(ChunkSizes[1]), + CreateChunk(ChunkSizes[2]), + CreateChunk(ChunkSizes[3]), + CreateChunk(ChunkSizes[4]), + CreateChunk(ChunkSizes[5]), + CreateChunk(ChunkSizes[6]), + CreateChunk(ChunkSizes[7]), + CreateChunk(ChunkSizes[8])}; + IoHash ChunkHashes[9] = { + IoHash::HashBuffer(Chunks[0].Data(), Chunks[0].Size()), + IoHash::HashBuffer(Chunks[1].Data(), Chunks[1].Size()), + IoHash::HashBuffer(Chunks[2].Data(), Chunks[2].Size()), + IoHash::HashBuffer(Chunks[3].Data(), Chunks[3].Size()), + IoHash::HashBuffer(Chunks[4].Data(), Chunks[4].Size()), + IoHash::HashBuffer(Chunks[5].Data(), Chunks[5].Size()), + IoHash::HashBuffer(Chunks[6].Data(), Chunks[6].Size()), + IoHash::HashBuffer(Chunks[7].Data(), Chunks[7].Size()), + IoHash::HashBuffer(Chunks[8].Data(), Chunks[8].Size()), + }; + + CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); + CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); + + CasStoreSize InitialSize = CasStore->TotalSize(); + + // Keep first and last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[0]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + } + + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); + + // Keep last + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[7], ChunkHashes[7]); + } + + // Keep mixed + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[1]); + KeepChunks.push_back(ChunkHashes[4]); + KeepChunks.push_back(ChunkHashes[7]); + GcCtx.ContributeCas(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); + + CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + CasStore->InsertChunk(Chunks[6], ChunkHashes[6]); + CasStore->InsertChunk(Chunks[8], ChunkHashes[8]); + } + + // Keep multiple at end + { + GcContext GcCtx; + GcCtx.CollectSmallObjects(true); + std::vector<IoHash> KeepChunks; + KeepChunks.push_back(ChunkHashes[6]); + KeepChunks.push_back(ChunkHashes[7]); + KeepChunks.push_back(ChunkHashes[8]); + GcCtx.ContributeCas(KeepChunks); + + CasStore->Flush(); + Gc.CollectGarbage(GcCtx); + + CHECK(!CasStore->ContainsChunk(ChunkHashes[0])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[1])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[2])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[3])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[4])); + CHECK(!CasStore->ContainsChunk(ChunkHashes[5])); + CHECK(CasStore->ContainsChunk(ChunkHashes[6])); + CHECK(CasStore->ContainsChunk(ChunkHashes[7])); + CHECK(CasStore->ContainsChunk(ChunkHashes[8])); + + CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + + CasStore->InsertChunk(Chunks[0], ChunkHashes[0]); + CasStore->InsertChunk(Chunks[1], ChunkHashes[1]); + CasStore->InsertChunk(Chunks[2], ChunkHashes[2]); + CasStore->InsertChunk(Chunks[3], ChunkHashes[3]); + CasStore->InsertChunk(Chunks[4], ChunkHashes[4]); + CasStore->InsertChunk(Chunks[5], ChunkHashes[5]); + } + + // Verify that we nicely appended blocks even after all GC operations + CHECK(ChunkHashes[0] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[0]))); + CHECK(ChunkHashes[1] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[1]))); + CHECK(ChunkHashes[2] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[2]))); + CHECK(ChunkHashes[3] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[3]))); + CHECK(ChunkHashes[4] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[4]))); + CHECK(ChunkHashes[5] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[5]))); + CHECK(ChunkHashes[6] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[6]))); + CHECK(ChunkHashes[7] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[7]))); + CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); + + auto FinalSize = CasStore->TotalSize(); + CHECK(InitialSize.TinySize == FinalSize.TinySize); +} #endif void diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h index 2df016c76..5a500c65f 100644 --- a/zenstore/include/zenstore/basicfile.h +++ b/zenstore/include/zenstore/basicfile.h @@ -31,8 +31,17 @@ public: BasicFile(const BasicFile&) = delete; BasicFile& operator=(const BasicFile&) = delete; - void Open(std::filesystem::path FileName, bool IsCreate); - void Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec); + enum class Mode : uint32_t + { + kRead = 0, // Opens a existing file for read only + kWrite = 1, // Opens (or creates) a file for read and write + kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero + kDelete = 3, // Opens (or creates) a file for read and write enabling MarkAsDeleteOnClose() + kTruncateDelete = 4 // Opens (or creates) a file for read and write and sets the size to zero enabling MarkAsDeleteOnClose() + }; + + void Open(const std::filesystem::path& FileName, Mode Mode); + void Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec); void Close(); void Read(void* Data, uint64_t Size, uint64_t FileOffset); void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); @@ -43,13 +52,17 @@ public: void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec); void Flush(); uint64_t FileSize(); + void SetFileSize(uint64_t FileSize); IoBuffer ReadAll(); void WriteAll(IoBuffer Data, std::error_code& Ec); + void MarkAsDeleteOnClose(std::error_code& Ec); + void* Detach(); inline void* Handle() { return m_FileHandle; } protected: void* m_FileHandle = nullptr; // This is either null or valid +private: }; /** diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h new file mode 100644 index 000000000..424db461a --- /dev/null +++ b/zenstore/include/zenstore/blockstore.h @@ -0,0 +1,104 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/filesystem.h> +#include <zencore/zencore.h> +#include <zenstore/basicfile.h> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +struct BlockStoreLocation +{ + uint32_t BlockIndex; + uint64_t Offset; + uint64_t Size; +}; + +#pragma pack(push) +#pragma pack(1) + +struct BlockStoreDiskLocation +{ + constexpr static uint32_t MaxBlockIndexBits = 20; + constexpr static uint32_t MaxOffsetBits = 28; + constexpr static uint32_t MaxBlockIndex = (1ul << BlockStoreDiskLocation::MaxBlockIndexBits) - 1ul; + constexpr static uint32_t MaxOffset = (1ul << BlockStoreDiskLocation::MaxOffsetBits) - 1ul; + + BlockStoreDiskLocation(const BlockStoreLocation& Location, uint64_t OffsetAlignment) + { + Init(Location.BlockIndex, Location.Offset / OffsetAlignment, Location.Size); + } + + BlockStoreDiskLocation() = default; + + inline BlockStoreLocation Get(uint64_t OffsetAlignment) const + { + uint64_t PackedOffset = 0; + memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); + return {.BlockIndex = static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits), + .Offset = (PackedOffset & MaxOffset) * OffsetAlignment, + .Size = GetSize()}; + } + + inline uint32_t GetBlockIndex() const + { + uint64_t PackedOffset = 0; + memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); + return static_cast<std::uint32_t>(PackedOffset >> MaxOffsetBits); + } + + inline uint64_t GetOffset(uint64_t OffsetAlignment) const + { + uint64_t PackedOffset = 0; + memcpy(&PackedOffset, &m_Offset, sizeof m_Offset); + return (PackedOffset & MaxOffset) * OffsetAlignment; + } + + inline uint64_t GetSize() const { return m_Size; } + +private: + inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size) + { + ZEN_ASSERT(BlockIndex <= MaxBlockIndex); + ZEN_ASSERT(Offset <= MaxOffset); + ZEN_ASSERT(Size <= std::numeric_limits<std::uint32_t>::max()); + + m_Size = static_cast<uint32_t>(Size); + uint64_t PackedOffset = (static_cast<uint64_t>(BlockIndex) << MaxOffsetBits) + Offset; + memcpy(&m_Offset[0], &PackedOffset, sizeof m_Offset); + } + + uint32_t m_Size; + uint8_t m_Offset[6]; +}; + +#pragma pack(pop) + +struct BlockStoreFile : public RefCounted +{ + explicit BlockStoreFile(const std::filesystem::path& BlockPath); + ~BlockStoreFile(); + const std::filesystem::path& GetPath() const; + void Open(); + void Create(uint64_t InitialSize); + void MarkAsDeleteOnClose(std::error_code& Ec); + uint64_t FileSize(); + IoBuffer GetChunk(uint64_t Offset, uint64_t Size); + void Read(void* Data, uint64_t Size, uint64_t FileOffset); + void Write(const void* Data, uint64_t Size, uint64_t FileOffset); + void Truncate(uint64_t Size); + void Flush(); + void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); + +private: + const std::filesystem::path m_Path; + IoBuffer m_IoBuffer; + BasicFile m_File; +}; + +void blockstore_forcelink(); + +} // namespace zen diff --git a/zenstore/include/zenstore/cas.h b/zenstore/include/zenstore/cas.h index 7a7233c1c..5592fbd0a 100644 --- a/zenstore/include/zenstore/cas.h +++ b/zenstore/include/zenstore/cas.h @@ -69,7 +69,7 @@ public: private: // Q: should we protect this with a lock, or is that a higher level concern? - std::unordered_set<IoHash> m_ChunkSet; + std::unordered_set<IoHash, IoHash::Hasher> m_ChunkSet; }; /** Context object for data scrubbing diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index 1bd11800c..4b93a708f 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -4,18 +4,8 @@ #include "zenstore.h" -#include <zencore/iobuffer.h> -#include <zencore/string.h> -#include <zencore/thread.h> #include <zencore/uid.h> #include <zenstore/basicfile.h> -#include <zenstore/cas.h> - -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#endif - -#include <functional> namespace zen { @@ -25,12 +15,20 @@ public: CasLogFile(); ~CasLogFile(); - void Open(std::filesystem::path FileName, size_t RecordSize, bool isCreate); + enum class Mode + { + kRead, + kWrite, + kTruncate + }; + + void Open(std::filesystem::path FileName, size_t RecordSize, Mode Mode); void Append(const void* DataPointer, uint64_t DataSize); - void Replay(std::function<void(const void*)>&& Handler); + void Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntryCount); void Flush(); void Close(); uint64_t GetLogSize(); + uint64_t GetLogCount(); private: struct FileHeader @@ -51,6 +49,8 @@ private: static_assert(sizeof(FileHeader) == 64); private: + void Open(std::filesystem::path FileName, size_t RecordSize, BasicFile::Mode Mode); + BasicFile m_File; FileHeader m_Header; size_t m_RecordSize = 1; @@ -61,18 +61,20 @@ template<typename T> class TCasLogFile : public CasLogFile { public: - void Open(std::filesystem::path FileName, bool IsCreate) { CasLogFile::Open(FileName, sizeof(T), IsCreate); } + void Open(std::filesystem::path FileName, Mode Mode) { CasLogFile::Open(FileName, sizeof(T), Mode); } // This should be called before the Replay() is called to do some basic sanity checking bool Initialize() { return true; } - void Replay(Invocable<const T&> auto Handler) + void Replay(Invocable<const T&> auto Handler, uint64_t SkipEntryCount) { - CasLogFile::Replay([&](const void* VoidPtr) { - const T& Record = *reinterpret_cast<const T*>(VoidPtr); + CasLogFile::Replay( + [&](const void* VoidPtr) { + const T& Record = *reinterpret_cast<const T*>(VoidPtr); - Handler(Record); - }); + Handler(Record); + }, + SkipEntryCount); } void Append(const T& Record) @@ -82,6 +84,8 @@ public: CasLogFile::Append(&Record, sizeof Record); } + + void Append(const std::span<T>& Records) { CasLogFile::Append(Records.data(), sizeof(T) * Records.size()); } }; } // namespace zen diff --git a/zenstore/include/zenstore/gc.h b/zenstore/include/zenstore/gc.h index b8ba338f0..bc8dee9a3 100644 --- a/zenstore/include/zenstore/gc.h +++ b/zenstore/include/zenstore/gc.h @@ -78,6 +78,9 @@ public: GcClock::Duration MaxCacheDuration() const; void MaxCacheDuration(GcClock::Duration Duration); + void DiskReservePath(const std::filesystem::path& Path); + uint64_t ClaimGCReserve(); + inline bool Expired(GcClock::Tick TickCount) { return Time() - GcClock::TimePointFromTick(TickCount) > MaxCacheDuration(); } private: @@ -170,6 +173,7 @@ struct GcSchedulerConfig std::chrono::seconds MaxCacheDuration{86400}; bool CollectSmallObjects = true; bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; }; /** diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp index dbb3dbbf7..5f40b7f60 100644 --- a/zenstore/zenstore.cpp +++ b/zenstore/zenstore.cpp @@ -3,6 +3,7 @@ #include "zenstore/zenstore.h" #include <zenstore/basicfile.h> +#include <zenstore/blockstore.h> #include <zenstore/cas.h> #include <zenstore/gc.h> #include "compactcas.h" @@ -16,6 +17,7 @@ zenstore_forcelinktests() basicfile_forcelink(); CAS_forcelink(); filecas_forcelink(); + blockstore_forcelink(); compactcas_forcelink(); gc_forcelink(); } |