aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenstore/filecas.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenstore/filecas.cpp')
-rw-r--r--src/zenstore/filecas.cpp1452
1 files changed, 1452 insertions, 0 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
new file mode 100644
index 000000000..1d25920c4
--- /dev/null
+++ b/src/zenstore/filecas.cpp
@@ -0,0 +1,1452 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "filecas.h"
+
+#include <zencore/compress.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/memory.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+#include <zencore/thread.h>
+#include <zencore/timer.h>
+#include <zencore/uid.h>
+#include <zenstore/gc.h>
+#include <zenstore/scrubcontext.h>
+#include <zenutil/basicfile.h>
+
+#if ZEN_WITH_TESTS
+# include <zencore/compactbinarybuilder.h>
+#endif
+
+#include <gsl/gsl-lite.hpp>
+
+#include <barrier>
+#include <filesystem>
+#include <functional>
+#include <unordered_map>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <xxhash.h>
+#if ZEN_PLATFORM_WINDOWS
+# include <atlfile.h>
+#endif
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+namespace filecas::impl {
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".ulog";
+
+ std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); }
+
+ std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir)
+ {
+ return RootDir / fmt::format("cas.tmp{}", IndexExtension);
+ }
+
+ std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); }
+
+#pragma pack(push)
+#pragma pack(1)
+
+ struct FileCasIndexHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx';
+ static constexpr uint32_t CurrentVersion = 1;
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint64_t EntryCount = 0;
+ uint64_t LogPosition = 0;
+ uint32_t Reserved = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const FileCasIndexHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+
+ static_assert(sizeof(FileCasIndexHeader) == 32);
+
+#pragma pack(pop)
+
+} // namespace filecas::impl
+
+FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
+{
+ ShardedPath.Append(RootPath.c_str());
+
+ ExtendableStringBuilder<64> HashString;
+ ChunkHash.ToHexString(HashString);
+
+ const char* str = HashString.c_str();
+
+ // Shard into a path with two directory levels containing 12 bits and 8 bits
+ // respectively.
+ //
+ // This results in a maximum of 4096 * 256 directories
+ //
+ // The numbers have been chosen somewhat arbitrarily but are large to scale
+ // to very large chunk repositories without creating too many directories
+ // on a single level since NTFS does not deal very well with this.
+ //
+ // It may or may not make sense to make this a configurable policy, and it
+ // would probably be a good idea to measure performance for different
+ // policies and chunk counts
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str, str + 3);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 3, str + 5);
+ Shard2len = ShardedPath.Size();
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 5, str + 40);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas"))
+{
+}
+
+FileCasStrategy::~FileCasStrategy()
+{
+}
+
+void
+FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore)
+{
+ using namespace filecas::impl;
+
+ m_IsInitialized = true;
+
+ m_RootDirectory = RootDirectory;
+
+ m_Index.clear();
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
+
+ if (IsNewStore)
+ {
+ std::filesystem::remove(LogPath);
+ std::filesystem::remove(IndexPath);
+
+ if (std::filesystem::is_directory(m_RootDirectory))
+ {
+ // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside
+ // in this folder as well
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override
+ {
+ // We don't care about files
+ }
+ static bool IsHexChar(std::filesystem::path::value_type C)
+ {
+ return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16];
+ }
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& DirectoryName) override
+ {
+ if (DirectoryName.length() == 3)
+ {
+ if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2]))
+ {
+ ShardedRoots.push_back(Parent / DirectoryName);
+ }
+ }
+ return false;
+ }
+ std::vector<std::filesystem::path> ShardedRoots;
+ } CasVisitor;
+
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor);
+ for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots)
+ {
+ std::filesystem::remove_all(SharededRoot);
+ }
+ }
+ }
+
+ m_LogFlushPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
+ for (const auto& Entry : m_Index)
+ {
+ m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed);
+ }
+
+ CreateDirectories(m_RootDirectory);
+ m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
+
+ if (IsNewStore || LogEntryCount > 0)
+ {
+ MakeIndexSnapshot();
+ }
+}
+
+#if ZEN_PLATFORM_WINDOWS
+static void
+DeletePayloadFileOnClose(const void* FileHandle)
+{
+ const HANDLE WinFileHandle = (const HANDLE)FileHandle;
+ // This will cause the file to be deleted when the last handle to it is closed
+ FILE_DISPOSITION_INFO Fdi{};
+ Fdi.DeleteFile = TRUE;
+ BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+
+ if (!Success)
+ {
+ // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
+ // to delete it.
+ ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'",
+ PathFromHandle(WinFileHandle),
+ GetLastErrorAsString());
+ }
+}
+#endif
+
+CasStore::InsertResult
+FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+#if !ZEN_WITH_TESTS
+ ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
+#endif
+
+ if (Mode == CasStore::InsertMode::kCopyOnly)
+ {
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (m_Index.contains(ChunkHash))
+ {
+ return CasStore::InsertResult{.New = false};
+ }
+ }
+ return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
+ }
+
+ // File-based chunks have special case handling whereby we move the file into
+ // place in the file store directory, thus avoiding unnecessary copying
+
+ IoBufferFileReference FileRef;
+ if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
+ {
+ {
+ bool Exists = true;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Exists = m_Index.contains(ChunkHash);
+ }
+ if (Exists)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ DeletePayloadFileOnClose(FileRef.FileHandle);
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle);
+ if (unlink(FilePath.c_str()) < 0)
+ {
+ int UnlinkError = zen::GetLastError();
+ if (UnlinkError != ENOENT)
+ {
+ ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
+ FilePath.string(),
+ GetSystemErrorAsString(UnlinkError));
+ }
+ }
+#endif
+ return CasStore::InsertResult{.New = false};
+ }
+ }
+
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
+
+#if ZEN_PLATFORM_WINDOWS
+ const HANDLE ChunkFileHandle = FileRef.FileHandle;
+ // See if file already exists
+ {
+ CAtlFile PayloadFile;
+
+ if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
+ {
+ // If we succeeded in opening the target file then we don't need to do anything else because it already exists
+ // and should contain the content we were about to insert
+
+ // We do need to ensure the source file goes away on close, however
+ size_t ChunkSize = Chunk.GetSize();
+ uint64_t FileSize = 0;
+ if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
+ {
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+ return CasStore::InsertResult{.New = IsNew};
+ }
+ else
+ {
+ ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
+ Name.ShardedPath.ToUtf8(),
+ ChunkSize,
+ FileSize);
+ }
+ }
+ else
+ {
+ if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
+ {
+ // Shard directory does not exist
+ }
+ else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND))
+ {
+ // Shard directory exists, but not the file
+ }
+ else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION))
+ {
+ // Sharing violation, likely because we are trying to open a file
+ // which has been renamed on another thread, and the file handle
+ // used to rename it is still open. We handle this case below
+ // instead of here
+ }
+ else
+ {
+ ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes);
+ }
+ }
+ }
+
+ std::filesystem::path FullPath(Name.ShardedPath.c_str());
+
+ std::filesystem::path FilePath = FullPath.parent_path();
+ std::wstring FileName = FullPath.native();
+
+ const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR));
+ FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize));
+ memset(RenameInfo, 0, BufferSize);
+
+ RenameInfo->ReplaceIfExists = FALSE;
+ RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size());
+ memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR));
+ RenameInfo->FileName[FileName.size()] = 0;
+
+ auto $ = MakeGuard([&] { Memory::Free(RenameInfo); });
+
+ // Try to move file into place
+ BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
+
+ if (!Success)
+ {
+ // The rename/move could fail because the target directory does not yet exist. This code attempts
+ // to create it
+
+ CAtlFile DirHandle;
+
+ auto InternalCreateDirectoryHandle = [&] {
+ return DirHandle.Create(FilePath.c_str(),
+ GENERIC_READ | GENERIC_WRITE,
+ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
+ OPEN_EXISTING,
+ FILE_FLAG_BACKUP_SEMANTICS);
+ };
+
+ // It's possible for several threads to enter this logic trying to create the same
+ // directory. Only one will create the directory of course, but all threads will
+ // make it through okay
+
+ HRESULT hRes = InternalCreateDirectoryHandle();
+
+ if (FAILED(hRes))
+ {
+ // TODO: we can handle directory creation more intelligently and efficiently than
+ // this currently does
+
+ CreateDirectories(FilePath.c_str());
+
+ hRes = InternalCreateDirectoryHandle();
+ }
+
+ if (FAILED(hRes))
+ {
+ ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath));
+ }
+
+ // Retry rename/move
+
+ Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
+ }
+
+ if (Success)
+ {
+ m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ const DWORD LastError = GetLastError();
+
+ if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
+ {
+ HashLock.ReleaseNow();
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
+ GetSystemErrorAsString(LastError),
+ ChunkHash);
+
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
+ std::filesystem::path DestPath = Name.ShardedPath.c_str();
+ int Ret = link(SourcePath.c_str(), DestPath.c_str());
+ if (Ret < 0 && zen::GetLastError() == ENOENT)
+ {
+ // Destination directory doesn't exist. Create it any try again.
+ CreateDirectories(DestPath.parent_path().c_str());
+ Ret = link(SourcePath.c_str(), DestPath.c_str());
+ }
+ int LinkError = zen::GetLastError();
+
+ if (unlink(SourcePath.c_str()) < 0)
+ {
+ int UnlinkError = zen::GetLastError();
+ if (UnlinkError != ENOENT)
+ {
+ ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
+ SourcePath.string(),
+ GetSystemErrorAsString(UnlinkError));
+ }
+ }
+
+ // It is possible that someone beat us to it in linking the file. In that
+ // case a "file exists" error is okay. All others are not.
+ if (Ret < 0)
+ {
+ if (LinkError == EEXIST)
+ {
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
+ GetSystemErrorAsString(LinkError),
+ ChunkHash);
+ }
+ else
+ {
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+#endif // ZEN_PLATFORM_*
+ }
+
+ return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
+}
+
+CasStore::InsertResult
+FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (m_Index.contains(ChunkHash))
+ {
+ return {.New = false};
+ }
+ }
+
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ // See if file already exists
+
+#if ZEN_PLATFORM_WINDOWS
+ CAtlFile PayloadFile;
+
+ HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+
+ if (SUCCEEDED(hRes))
+ {
+ // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the
+ // content we were about to insert
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+
+ PayloadFile.Close();
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ if (access(Name.ShardedPath.c_str(), F_OK) == 0)
+ {
+ return CasStore::InsertResult{.New = false};
+ }
+#endif
+
+ RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
+
+#if ZEN_PLATFORM_WINDOWS
+ // For now, use double-checked locking to see if someone else was first
+
+ hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+
+ if (SUCCEEDED(hRes))
+ {
+ uint64_t FileSize = 0;
+ if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
+ {
+ // If we succeeded in opening the file then and the size is correct we don't need to do anything
+ // else because someone else managed to create the file before we did. Just return.
+
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+ else
+ {
+ ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
+ Name.ShardedPath.ToUtf8(),
+ ChunkSize,
+ FileSize);
+ }
+ }
+
+ if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)))
+ {
+ ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes));
+ }
+
+ auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
+
+ hRes = InternalCreateFile();
+
+ if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
+ {
+ // Ensure parent directories exist and retry file creation
+ CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len));
+ hRes = InternalCreateFile();
+ }
+
+ if (FAILED(hRes))
+ {
+ ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8()));
+ }
+#else
+ // Attempt to exclusively create the file.
+ auto InternalCreateFile = [&] {
+ int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
+ if (Fd >= 0)
+ {
+ fchmod(Fd, 0666);
+ }
+ return Fd;
+ };
+ int Fd = InternalCreateFile();
+ if (Fd < 0)
+ {
+ switch (zen::GetLastError())
+ {
+ case EEXIST:
+ // Another thread has beat us to it so we're golden.
+ {
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return {.New = IsNew};
+ }
+ break;
+
+ case ENOENT:
+ if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len)))
+ {
+ Fd = InternalCreateFile();
+ if (Fd >= 0)
+ {
+ break;
+ }
+ }
+ ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath));
+
+ default:
+ ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8()));
+ }
+ }
+
+ struct FdWrapper
+ {
+ ~FdWrapper() { Close(); }
+ void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); }
+ void Close()
+ {
+ if (Fd >= 0)
+ {
+ close(Fd);
+ Fd = -1;
+ }
+ }
+ int Fd;
+ } PayloadFile = {Fd};
+#endif // ZEN_PLATFORM_WINDOWS
+
+ size_t ChunkRemain = ChunkSize;
+ auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData);
+
+ while (ChunkRemain != 0)
+ {
+ uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain));
+
+ PayloadFile.Write(ChunkCursor, ByteCount);
+
+ ChunkCursor += ByteCount;
+ ChunkRemain -= ByteCount;
+ }
+
+ // We cannot rely on RAII to close the file handle since it would be closed
+ // *after* the lock is released due to the initialization order
+ PayloadFile.Close();
+
+ m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
+
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+
+ return {.New = IsNew};
+}
+
+IoBuffer
+FileCasStrategy::FindChunk(const IoHash& ChunkHash)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (!m_Index.contains(ChunkHash))
+ {
+ return {};
+ }
+ }
+
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ RwLock::SharedLockScope _(LockForHash(ChunkHash));
+
+ return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+}
+
+bool
+FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ RwLock::SharedLockScope _(m_Lock);
+ return m_Index.contains(ChunkHash);
+}
+
+void
+FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
+{
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+
+ uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
+ if (Ec)
+ {
+ ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8());
+ FileSize = 0;
+ }
+
+ ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize));
+ std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+
+ if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str()))
+ {
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
+ {
+ m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed);
+ m_Index.erase(It);
+ }
+ }
+ m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize});
+ }
+}
+
+void
+FileCasStrategy::FilterChunks(HashKeySet& InOutChunks)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ // NOTE: it's not a problem now, but in the future if a GC should happen while this
+ // is in flight, the result could be wrong since chunks could go away in the meantime.
+ //
+ // It would be good to have a pinning mechanism to make this less likely but
+ // given that chunks could go away at any point after the results are returned to
+ // a caller, this is something which needs to be taken into account by anyone consuming
+ // this functionality in any case
+
+ InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
+}
+
+void
+FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& It : m_Index)
+ {
+ const IoHash& NameHash = It.first;
+ ShardingHelper Name(m_RootDirectory.c_str(), NameHash);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ Callback(NameHash, std::move(Payload));
+ }
+}
+
+void
+FileCasStrategy::Flush()
+{
+ // Since we don't keep files open after writing there's nothing specific
+ // to flush here.
+ //
+ // Depending on what semantics we want Flush() to provide, it could be
+ // argued that this should just flush the volume which we are using to
+ // store the CAS files on here, to ensure metadata is flushed along
+ // with file data
+ //
+ // Related: to facilitate more targeted validation during recovery we could
+ // maintain a log of when chunks were created
+}
+
+void
+FileCasStrategy::Scrub(ScrubContext& Ctx)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ std::vector<IoHash> BadHashes;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
+
+ {
+ std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
+ RwLock::ExclusiveLockScope _(m_Lock);
+ for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
+ {
+ if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
+ m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
+ }
+ }
+ }
+
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ if (!Payload)
+ {
+ BadHashes.push_back(Hash);
+ return;
+ }
+ ++ChunkCount;
+ ChunkBytes += Payload.GetSize();
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize))
+ {
+ if (RawHash != Hash)
+ {
+ // Hash mismatch
+ BadHashes.push_back(Hash);
+ return;
+ }
+ return;
+ }
+#if ZEN_WITH_TESTS
+ IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload))));
+ if (ComputedHash == Hash)
+ {
+ return;
+ }
+#endif
+ BadHashes.push_back(Hash);
+ });
+
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
+
+ if (!BadHashes.empty())
+ {
+ ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size());
+
+ if (Ctx.RunRecovery())
+ {
+ ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());
+
+ for (const IoHash& Hash : BadHashes)
+ {
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("failed to delete file for chunk {}", Hash);
+ }
+ }
+ }
+ }
+
+ // Let whomever it concerns know about the bad chunks. This could
+ // be used to invalidate higher level data structures more efficiently
+ // than a full validation pass might be able to do
+ Ctx.ReportBadCidChunks(BadHashes);
+
+ ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
+}
+
+void
+FileCasStrategy::CollectGarbage(GcContext& GcCtx)
+{
+ ZEN_ASSERT(m_IsInitialized);
+
+ ZEN_DEBUG("collecting garbage from {}", m_RootDirectory);
+
+ std::vector<IoHash> ChunksToDelete;
+ std::atomic<uint64_t> ChunksToDeleteBytes{0};
+ std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
+
+ std::vector<IoHash> CandidateCas;
+ CandidateCas.resize(1);
+
+ uint64_t DeletedCount = 0;
+ uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}",
+ m_RootDirectory,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ DeletedCount,
+ ChunkCount,
+ NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)),
+ NiceBytes(OldTotalSize));
+ });
+
+ IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ bool KeepThis = false;
+ CandidateCas[0] = Hash;
+ GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
+ ZEN_UNUSED(Hash);
+ KeepThis = true;
+ });
+
+ const uint64_t FileSize = Payload.GetSize();
+
+ if (!KeepThis)
+ {
+ ChunksToDelete.push_back(Hash);
+ ChunksToDeleteBytes.fetch_add(FileSize);
+ }
+
+ ++ChunkCount;
+ ChunkBytes.fetch_add(FileSize);
+ });
+
+ // TODO, any entires we did not encounter during our IterateChunks should be removed from the index
+
+ if (ChunksToDelete.empty())
+ {
+ ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory);
+ return;
+ }
+
+ ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})",
+ m_RootDirectory,
+ ChunksToDelete.size(),
+ ChunkCount.load(),
+ NiceBytes(ChunksToDeleteBytes));
+
+ if (GcCtx.IsDeletionMode() == false)
+ {
+ ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled");
+
+ return;
+ }
+
+ for (const IoHash& Hash : ChunksToDelete)
+ {
+ ZEN_TRACE("deleting chunk {}", Hash);
+
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message());
+ continue;
+ }
+ DeletedCount++;
+ }
+
+ GcCtx.AddDeletedCids(ChunksToDelete);
+}
+
+bool
+FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason)
+{
+ if (Entry.Key == IoHash::Zero)
+ {
+ OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.Flags & (~FileCasIndexEntry::kTombStone))
+ {
+ OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString());
+ return false;
+ }
+ if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
+ {
+ return true;
+ }
+ uint64_t Size = Entry.Size;
+ if (Size == 0)
+ {
+ OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
+ return false;
+ }
+ return true;
+}
+
+void
+FileCasStrategy::MakeIndexSnapshot()
+{
+ using namespace filecas::impl;
+
+ uint64_t LogCount = m_CasLog.GetLogCount();
+ if (m_LogFlushPosition == LogCount)
+ {
+ return;
+ }
+ ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory);
+ uint64_t EntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}",
+ m_RootDirectory,
+ EntryCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ namespace fs = std::filesystem;
+
+ fs::path IndexPath = GetIndexPath(m_RootDirectory);
+ fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory);
+
+ // Move index away, we keep it if something goes wrong
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+ if (fs::is_regular_file(IndexPath))
+ {
+ fs::rename(IndexPath, STmpIndexPath);
+ }
+
+ try
+ {
+ // Write the current state of the location map to a new index state
+ std::vector<FileCasIndexEntry> Entries;
+
+ {
+ Entries.resize(m_Index.size());
+
+ uint64_t EntryIndex = 0;
+ for (auto& Entry : m_Index)
+ {
+ FileCasIndexEntry& IndexEntry = Entries[EntryIndex++];
+ IndexEntry.Key = Entry.first;
+ IndexEntry.Size = Entry.second.Size;
+ }
+ }
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate);
+ filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount};
+
+ Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header);
+
+ ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0);
+ ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader));
+ ObjectIndexFile.Flush();
+ ObjectIndexFile.Close();
+ EntryCount = Entries.size();
+ m_LogFlushPosition = LogCount;
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what());
+
+ // Restore any previous snapshot
+
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(IndexPath);
+ fs::rename(STmpIndexPath, IndexPath);
+ }
+ }
+ if (fs::is_regular_file(STmpIndexPath))
+ {
+ fs::remove(STmpIndexPath);
+ }
+}
+uint64_t
+FileCasStrategy::ReadIndexFile()
+{
+ using namespace filecas::impl;
+
+ std::vector<FileCasIndexEntry> Entries;
+ std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' index containing {} entries in {}",
+ IndexPath,
+ Entries.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ BasicFile ObjectIndexFile;
+ ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
+ uint64_t Size = ObjectIndexFile.FileSize();
+ if (Size >= sizeof(FileCasIndexHeader))
+ {
+ uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry);
+ FileCasIndexHeader Header;
+ ObjectIndexFile.Read(&Header, sizeof(Header), 0);
+ if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) &&
+ (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount))
+ {
+ Entries.resize(Header.EntryCount);
+ ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader));
+
+ std::string InvalidEntryReason;
+ for (const FileCasIndexEntry& Entry : Entries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
+ }
+
+ return Header.LogPosition;
+ }
+ else
+ {
+ ZEN_WARN("skipping invalid index file '{}'", IndexPath);
+ }
+ }
+ return 0;
+ }
+
+ if (std::filesystem::is_directory(m_RootDirectory))
+ {
+ ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory);
+ TCasLogFile<FileCasIndexEntry> CasLog;
+ uint64_t TotalSize = 0;
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}",
+ m_RootDirectory,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ CasLog.GetLogCount(),
+ NiceBytes(TotalSize));
+ });
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+
+ std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory);
+ CasLog.Open(LogPath, CasLogFile::Mode::kTruncate);
+ std::string InvalidEntryReason;
+ for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
+ {
+ if (!ValidateEntry(Entry, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason);
+ continue;
+ }
+ m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
+ CasLog.Append(Entry);
+ }
+
+ CasLog.Close();
+ }
+
+ return 0;
+}
+
+uint64_t
+FileCasStrategy::ReadLog(uint64_t SkipEntryCount)
+{
+ using namespace filecas::impl;
+
+ std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ uint64_t LogEntryCount = 0;
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ TCasLogFile<FileCasIndexEntry> CasLog;
+ CasLog.Open(LogPath, CasLogFile::Mode::kRead);
+ if (CasLog.Initialize())
+ {
+ uint64_t EntryCount = CasLog.GetLogCount();
+ if (EntryCount < SkipEntryCount)
+ {
+ ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath);
+ SkipEntryCount = 0;
+ }
+ LogEntryCount = EntryCount - SkipEntryCount;
+ m_Index.reserve(LogEntryCount);
+ uint64_t InvalidEntryCount = 0;
+ CasLog.Replay(
+ [&](const FileCasIndexEntry& Record) {
+ std::string InvalidEntryReason;
+ if (Record.Flags & FileCasIndexEntry::kTombStone)
+ {
+ m_Index.erase(Record.Key);
+ return;
+ }
+ if (!ValidateEntry(Record, InvalidEntryReason))
+ {
+ ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason);
+ ++InvalidEntryCount;
+ return;
+ }
+ m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size});
+ },
+ SkipEntryCount);
+ if (InvalidEntryCount)
+ {
+ ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath);
+ }
+ return LogEntryCount;
+ }
+ }
+ return 0;
+}
+
+std::vector<FileCasStrategy::FileCasIndexEntry>
+FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir)
+{
+ using namespace filecas::impl;
+
+ std::vector<FileCasIndexEntry> Entries;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {}
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override
+ {
+ std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory);
+
+ std::filesystem::path::string_type PathString = RelPath.native();
+
+ if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2)))
+ {
+ if (PathString.at(3) == std::filesystem::path::preferred_separator)
+ {
+ PathString.erase(3, 1);
+ }
+ PathString.append(File);
+
+ // TODO: should validate that we're actually dealing with a valid hex string here
+#if ZEN_PLATFORM_WINDOWS
+ StringBuilder<64> Utf8;
+ WideToUtf8(PathString, Utf8);
+ IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()});
+#else
+ IoHash NameHash = IoHash::FromHexString(PathString);
+#endif
+ Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize});
+ }
+ }
+
+ virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent,
+ [[maybe_unused]] const path_view& DirectoryName) override
+ {
+ return true;
+ }
+
+ const std::filesystem::path& RootDirectory;
+ std::vector<FileCasIndexEntry>& Entries;
+ } CasVisitor{RootDir, Entries};
+
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(RootDir, CasVisitor);
+ return Entries;
+};
+
+ //////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("cas.file.move")
+{
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ GcManager Gc;
+
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
+
+ {
+ std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
+
+ IoBuffer ZeroBytes{1024 * 1024};
+ IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes);
+
+ BasicFile PayloadFile;
+ PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate);
+ PayloadFile.Write(ZeroBytes, 0);
+ PayloadFile.Close();
+
+ IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path);
+
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash);
+ CHECK_EQ(Result.New, true);
+ }
+
+# if 0
+ SUBCASE("stresstest")
+ {
+ std::vector<IoHash> PayloadHashes;
+
+ const int kWorkers = 64;
+ const int kItemCount = 128;
+
+ for (int w = 0; w < kWorkers; ++w)
+ {
+ for (int i = 0; i < kItemCount; ++i)
+ {
+ IoBuffer Payload{1024};
+ *reinterpret_cast<int*>(Payload.MutableData()) = i;
+ PayloadHashes.push_back(IoHash::HashBuffer(Payload));
+
+ std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
+ WriteFile(PayloadPath, Payload);
+ }
+ }
+
+ std::barrier Sync{kWorkers};
+
+ auto PopulateAll = [&](int w) {
+ std::vector<IoBuffer> Buffers;
+
+ for (int i = 0; i < kItemCount; ++i)
+ {
+ std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)};
+ IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath);
+ Buffers.push_back(Payload);
+ Sync.arrive_and_wait();
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]);
+ }
+ };
+
+ std::vector<std::jthread> Threads;
+
+ for (int i = 0; i < kWorkers; ++i)
+ {
+ Threads.push_back(std::jthread(PopulateAll, i));
+ }
+
+ for (std::jthread& Thread : Threads)
+ {
+ Thread.join();
+ }
+ }
+# endif
+}
+
+TEST_CASE("cas.file.gc")
+{
+ // specifying an absolute path here can be helpful when using procmon to dig into things
+ ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"};
+
+ GcManager Gc;
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
+
+ const int kIterationCount = 1000;
+ std::vector<IoHash> Keys{kIterationCount};
+
+ auto InsertChunks = [&] {
+ for (int i = 0; i < kIterationCount; ++i)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "id" << i;
+ CbObject Obj = Cbo.Save();
+
+ IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer();
+ IoHash Hash = HashBuffer(ObjBuffer);
+
+ FileCas.InsertChunk(ObjBuffer, Hash);
+
+ Keys[i] = Hash;
+ }
+ };
+
+ // Drop everything
+
+ {
+ InsertChunks();
+
+ GcContext Ctx(GcClock::Now() - std::chrono::hours(24));
+ FileCas.CollectGarbage(Ctx);
+
+ for (const IoHash& Key : Keys)
+ {
+ IoBuffer Chunk = FileCas.FindChunk(Key);
+
+ CHECK(!Chunk);
+ }
+ }
+
+ // Keep roughly half of the chunks
+
+ {
+ InsertChunks();
+
+ GcContext Ctx(GcClock::Now() - std::chrono::hours(24));
+
+ for (const IoHash& Key : Keys)
+ {
+ if (Key.Hash[0] & 1)
+ {
+ Ctx.AddRetainedCids(std::vector<IoHash>{Key});
+ }
+ }
+
+ FileCas.CollectGarbage(Ctx);
+
+ for (const IoHash& Key : Keys)
+ {
+ if (Key.Hash[0] & 1)
+ {
+ CHECK(FileCas.FindChunk(Key));
+ }
+ else
+ {
+ CHECK(!FileCas.FindChunk(Key));
+ }
+ }
+ }
+}
+
+#endif
+
+void
+filecas_forcelink()
+{
+}
+
+} // namespace zen