diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenstore/filecas.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenstore/filecas.cpp')
| -rw-r--r-- | src/zenstore/filecas.cpp | 1452 |
1 files changed, 1452 insertions, 0 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp new file mode 100644 index 000000000..1d25920c4 --- /dev/null +++ b/src/zenstore/filecas.cpp @@ -0,0 +1,1452 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "filecas.h" + +#include <zencore/compress.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/memory.h> +#include <zencore/scopeguard.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> +#include <zencore/thread.h> +#include <zencore/timer.h> +#include <zencore/uid.h> +#include <zenstore/gc.h> +#include <zenstore/scrubcontext.h> +#include <zenutil/basicfile.h> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +#endif + +#include <gsl/gsl-lite.hpp> + +#include <barrier> +#include <filesystem> +#include <functional> +#include <unordered_map> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <xxhash.h> +#if ZEN_PLATFORM_WINDOWS +# include <atlfile.h> +#endif +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +namespace filecas::impl { + const char* IndexExtension = ".uidx"; + const char* LogExtension = ".ulog"; + + std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); } + + std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir) + { + return RootDir / fmt::format("cas.tmp{}", IndexExtension); + } + + std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); } + +#pragma pack(push) +#pragma pack(1) + + struct FileCasIndexHeader + { + static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; + static constexpr uint32_t CurrentVersion = 1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Reserved = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const FileCasIndexHeader& Header) + { + return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; + + static_assert(sizeof(FileCasIndexHeader) == 32); + +#pragma pack(pop) + +} // namespace filecas::impl + +FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) +{ + ShardedPath.Append(RootPath.c_str()); + + ExtendableStringBuilder<64> HashString; + ChunkHash.ToHexString(HashString); + + const char* str = HashString.c_str(); + + // Shard into a path with two directory levels containing 12 bits and 8 bits + // respectively. + // + // This results in a maximum of 4096 * 256 directories + // + // The numbers have been chosen somewhat arbitrarily but are large to scale + // to very large chunk repositories without creating too many directories + // on a single level since NTFS does not deal very well with this. + // + // It may or may not make sense to make this a configurable policy, and it + // would probably be a good idea to measure performance for different + // policies and chunk counts + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + Shard2len = ShardedPath.Size(); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); +} + +////////////////////////////////////////////////////////////////////////// + +FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas")) +{ +} + +FileCasStrategy::~FileCasStrategy() +{ +} + +void +FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) +{ + using namespace filecas::impl; + + m_IsInitialized = true; + + m_RootDirectory = RootDirectory; + + m_Index.clear(); + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); + + if (IsNewStore) + { + std::filesystem::remove(LogPath); + std::filesystem::remove(IndexPath); + + if (std::filesystem::is_directory(m_RootDirectory)) + { + // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside + // in this folder as well + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override + { + // We don't care about files + } + static bool IsHexChar(std::filesystem::path::value_type C) + { + return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16]; + } + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& DirectoryName) override + { + if (DirectoryName.length() == 3) + { + if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2])) + { + ShardedRoots.push_back(Parent / DirectoryName); + } + } + return false; + } + std::vector<std::filesystem::path> ShardedRoots; + } CasVisitor; + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); + for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots) + { + std::filesystem::remove_all(SharededRoot); + } + } + } + + m_LogFlushPosition = ReadIndexFile(); + uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); + for (const auto& Entry : m_Index) + { + m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); + } + + CreateDirectories(m_RootDirectory); + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + if (IsNewStore || LogEntryCount > 0) + { + MakeIndexSnapshot(); + } +} + +#if ZEN_PLATFORM_WINDOWS +static void +DeletePayloadFileOnClose(const void* FileHandle) +{ + const HANDLE WinFileHandle = (const HANDLE)FileHandle; + // This will cause the file to be deleted when the last handle to it is closed + FILE_DISPOSITION_INFO Fdi{}; + Fdi.DeleteFile = TRUE; + BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + + if (!Success) + { + // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed + // to delete it. + ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", + PathFromHandle(WinFileHandle), + GetLastErrorAsString()); + } +} +#endif + +CasStore::InsertResult +FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) +{ + ZEN_ASSERT(m_IsInitialized); + +#if !ZEN_WITH_TESTS + ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); +#endif + + if (Mode == CasStore::InsertMode::kCopyOnly) + { + { + RwLock::SharedLockScope _(m_Lock); + if (m_Index.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } + } + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); + } + + // File-based chunks have special case handling whereby we move the file into + // place in the file store directory, thus avoiding unnecessary copying + + IoBufferFileReference FileRef; + if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) + { + { + bool Exists = true; + { + RwLock::SharedLockScope _(m_Lock); + Exists = m_Index.contains(ChunkHash); + } + if (Exists) + { +#if ZEN_PLATFORM_WINDOWS + DeletePayloadFileOnClose(FileRef.FileHandle); +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); + if (unlink(FilePath.c_str()) < 0) + { + int UnlinkError = zen::GetLastError(); + if (UnlinkError != ENOENT) + { + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + FilePath.string(), + GetSystemErrorAsString(UnlinkError)); + } + } +#endif + return CasStore::InsertResult{.New = false}; + } + } + + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); + +#if ZEN_PLATFORM_WINDOWS + const HANDLE ChunkFileHandle = FileRef.FileHandle; + // See if file already exists + { + CAtlFile PayloadFile; + + if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) + { + // If we succeeded in opening the target file then we don't need to do anything else because it already exists + // and should contain the content we were about to insert + + // We do need to ensure the source file goes away on close, however + size_t ChunkSize = Chunk.GetSize(); + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) + { + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + + DeletePayloadFileOnClose(ChunkFileHandle); + + return CasStore::InsertResult{.New = IsNew}; + } + else + { + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); + } + } + else + { + if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) + { + // Shard directory does not exist + } + else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) + { + // Shard directory exists, but not the file + } + else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION)) + { + // Sharing violation, likely because we are trying to open a file + // which has been renamed on another thread, and the file handle + // used to rename it is still open. We handle this case below + // instead of here + } + else + { + ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes); + } + } + } + + std::filesystem::path FullPath(Name.ShardedPath.c_str()); + + std::filesystem::path FilePath = FullPath.parent_path(); + std::wstring FileName = FullPath.native(); + + const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR)); + FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize)); + memset(RenameInfo, 0, BufferSize); + + RenameInfo->ReplaceIfExists = FALSE; + RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size()); + memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); + RenameInfo->FileName[FileName.size()] = 0; + + auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); + + // Try to move file into place + BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); + + if (!Success) + { + // The rename/move could fail because the target directory does not yet exist. This code attempts + // to create it + + CAtlFile DirHandle; + + auto InternalCreateDirectoryHandle = [&] { + return DirHandle.Create(FilePath.c_str(), + GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS); + }; + + // It's possible for several threads to enter this logic trying to create the same + // directory. Only one will create the directory of course, but all threads will + // make it through okay + + HRESULT hRes = InternalCreateDirectoryHandle(); + + if (FAILED(hRes)) + { + // TODO: we can handle directory creation more intelligently and efficiently than + // this currently does + + CreateDirectories(FilePath.c_str()); + + hRes = InternalCreateDirectoryHandle(); + } + + if (FAILED(hRes)) + { + ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath)); + } + + // Retry rename/move + + Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); + } + + if (Success) + { + m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + + return CasStore::InsertResult{.New = IsNew}; + } + + const DWORD LastError = GetLastError(); + + if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) + { + HashLock.ReleaseNow(); + DeletePayloadFileOnClose(ChunkFileHandle); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + + return CasStore::InsertResult{.New = IsNew}; + } + + ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", + GetSystemErrorAsString(LastError), + ChunkHash); + + DeletePayloadFileOnClose(ChunkFileHandle); + +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); + std::filesystem::path DestPath = Name.ShardedPath.c_str(); + int Ret = link(SourcePath.c_str(), DestPath.c_str()); + if (Ret < 0 && zen::GetLastError() == ENOENT) + { + // Destination directory doesn't exist. Create it any try again. + CreateDirectories(DestPath.parent_path().c_str()); + Ret = link(SourcePath.c_str(), DestPath.c_str()); + } + int LinkError = zen::GetLastError(); + + if (unlink(SourcePath.c_str()) < 0) + { + int UnlinkError = zen::GetLastError(); + if (UnlinkError != ENOENT) + { + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + SourcePath.string(), + GetSystemErrorAsString(UnlinkError)); + } + } + + // It is possible that someone beat us to it in linking the file. In that + // case a "file exists" error is okay. All others are not. + if (Ret < 0) + { + if (LinkError == EEXIST) + { + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + + ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", + GetSystemErrorAsString(LinkError), + ChunkHash); + } + else + { + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } +#endif // ZEN_PLATFORM_* + } + + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); +} + +CasStore::InsertResult +FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) +{ + ZEN_ASSERT(m_IsInitialized); + + { + RwLock::SharedLockScope _(m_Lock); + if (m_Index.contains(ChunkHash)) + { + return {.New = false}; + } + } + + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + // See if file already exists + +#if ZEN_PLATFORM_WINDOWS + CAtlFile PayloadFile; + + HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + + if (SUCCEEDED(hRes)) + { + // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the + // content we were about to insert + + bool IsNew = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + + PayloadFile.Close(); +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + if (access(Name.ShardedPath.c_str(), F_OK) == 0) + { + return CasStore::InsertResult{.New = false}; + } +#endif + + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); + +#if ZEN_PLATFORM_WINDOWS + // For now, use double-checked locking to see if someone else was first + + hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + + if (SUCCEEDED(hRes)) + { + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) + { + // If we succeeded in opening the file then and the size is correct we don't need to do anything + // else because someone else managed to create the file before we did. Just return. + + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + else + { + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); + } + } + + if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) + { + ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); + } + + auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; + + hRes = InternalCreateFile(); + + if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) + { + // Ensure parent directories exist and retry file creation + CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); + hRes = InternalCreateFile(); + } + + if (FAILED(hRes)) + { + ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8())); + } +#else + // Attempt to exclusively create the file. + auto InternalCreateFile = [&] { + int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); + if (Fd >= 0) + { + fchmod(Fd, 0666); + } + return Fd; + }; + int Fd = InternalCreateFile(); + if (Fd < 0) + { + switch (zen::GetLastError()) + { + case EEXIST: + // Another thread has beat us to it so we're golden. + { + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return {.New = IsNew}; + } + break; + + case ENOENT: + if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) + { + Fd = InternalCreateFile(); + if (Fd >= 0) + { + break; + } + } + ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); + + default: + ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); + } + } + + struct FdWrapper + { + ~FdWrapper() { Close(); } + void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); } + void Close() + { + if (Fd >= 0) + { + close(Fd); + Fd = -1; + } + } + int Fd; + } PayloadFile = {Fd}; +#endif // ZEN_PLATFORM_WINDOWS + + size_t ChunkRemain = ChunkSize; + auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData); + + while (ChunkRemain != 0) + { + uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain)); + + PayloadFile.Write(ChunkCursor, ByteCount); + + ChunkCursor += ByteCount; + ChunkRemain -= ByteCount; + } + + // We cannot rely on RAII to close the file handle since it would be closed + // *after* the lock is released due to the initialization order + PayloadFile.Close(); + + m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); + + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + + return {.New = IsNew}; +} + +IoBuffer +FileCasStrategy::FindChunk(const IoHash& ChunkHash) +{ + ZEN_ASSERT(m_IsInitialized); + + { + RwLock::SharedLockScope _(m_Lock); + if (!m_Index.contains(ChunkHash)) + { + return {}; + } + } + + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + RwLock::SharedLockScope _(LockForHash(ChunkHash)); + + return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); +} + +bool +FileCasStrategy::HaveChunk(const IoHash& ChunkHash) +{ + ZEN_ASSERT(m_IsInitialized); + + RwLock::SharedLockScope _(m_Lock); + return m_Index.contains(ChunkHash); +} + +void +FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) +{ + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + + uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); + if (Ec) + { + ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); + FileSize = 0; + } + + ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize)); + std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + + if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str())) + { + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) + { + m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); + m_Index.erase(It); + } + } + m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); + } +} + +void +FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) +{ + ZEN_ASSERT(m_IsInitialized); + + // NOTE: it's not a problem now, but in the future if a GC should happen while this + // is in flight, the result could be wrong since chunks could go away in the meantime. + // + // It would be good to have a pinning mechanism to make this less likely but + // given that chunks could go away at any point after the results are returned to + // a caller, this is something which needs to be taken into account by anyone consuming + // this functionality in any case + + InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); +} + +void +FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback) +{ + ZEN_ASSERT(m_IsInitialized); + + RwLock::SharedLockScope _(m_Lock); + for (const auto& It : m_Index) + { + const IoHash& NameHash = It.first; + ShardingHelper Name(m_RootDirectory.c_str(), NameHash); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + Callback(NameHash, std::move(Payload)); + } +} + +void +FileCasStrategy::Flush() +{ + // Since we don't keep files open after writing there's nothing specific + // to flush here. + // + // Depending on what semantics we want Flush() to provide, it could be + // argued that this should just flush the volume which we are using to + // store the CAS files on here, to ensure metadata is flushed along + // with file data + // + // Related: to facilitate more targeted validation during recovery we could + // maintain a log of when chunks were created +} + +void +FileCasStrategy::Scrub(ScrubContext& Ctx) +{ + ZEN_ASSERT(m_IsInitialized); + + std::vector<IoHash> BadHashes; + uint64_t ChunkCount{0}, ChunkBytes{0}; + + { + std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); + RwLock::ExclusiveLockScope _(m_Lock); + for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) + { + if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed); + m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); + } + } + } + + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + if (!Payload) + { + BadHashes.push_back(Hash); + return; + } + ++ChunkCount; + ChunkBytes += Payload.GetSize(); + + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) + { + if (RawHash != Hash) + { + // Hash mismatch + BadHashes.push_back(Hash); + return; + } + return; + } +#if ZEN_WITH_TESTS + IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload)))); + if (ComputedHash == Hash) + { + return; + } +#endif + BadHashes.push_back(Hash); + }); + + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + + if (!BadHashes.empty()) + { + ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size()); + + if (Ctx.RunRecovery()) + { + ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); + + for (const IoHash& Hash : BadHashes) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}", Hash); + } + } + } + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + Ctx.ReportBadCidChunks(BadHashes); + + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); +} + +void +FileCasStrategy::CollectGarbage(GcContext& GcCtx) +{ + ZEN_ASSERT(m_IsInitialized); + + ZEN_DEBUG("collecting garbage from {}", m_RootDirectory); + + std::vector<IoHash> ChunksToDelete; + std::atomic<uint64_t> ChunksToDeleteBytes{0}; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; + + std::vector<IoHash> CandidateCas; + CandidateCas.resize(1); + + uint64_t DeletedCount = 0; + uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", + m_RootDirectory, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + DeletedCount, + ChunkCount, + NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)), + NiceBytes(OldTotalSize)); + }); + + IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + bool KeepThis = false; + CandidateCas[0] = Hash; + GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { + ZEN_UNUSED(Hash); + KeepThis = true; + }); + + const uint64_t FileSize = Payload.GetSize(); + + if (!KeepThis) + { + ChunksToDelete.push_back(Hash); + ChunksToDeleteBytes.fetch_add(FileSize); + } + + ++ChunkCount; + ChunkBytes.fetch_add(FileSize); + }); + + // TODO, any entires we did not encounter during our IterateChunks should be removed from the index + + if (ChunksToDelete.empty()) + { + ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); + return; + } + + ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})", + m_RootDirectory, + ChunksToDelete.size(), + ChunkCount.load(), + NiceBytes(ChunksToDeleteBytes)); + + if (GcCtx.IsDeletionMode() == false) + { + ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled"); + + return; + } + + for (const IoHash& Hash : ChunksToDelete) + { + ZEN_TRACE("deleting chunk {}", Hash); + + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); + continue; + } + DeletedCount++; + } + + GcCtx.AddDeletedCids(ChunksToDelete); +} + +bool +FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason) +{ + if (Entry.Key == IoHash::Zero) + { + OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); + return false; + } + if (Entry.Flags & (~FileCasIndexEntry::kTombStone)) + { + OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); + return false; + } + if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) + { + return true; + } + uint64_t Size = Entry.Size; + if (Size == 0) + { + OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); + return false; + } + return true; +} + +void +FileCasStrategy::MakeIndexSnapshot() +{ + using namespace filecas::impl; + + uint64_t LogCount = m_CasLog.GetLogCount(); + if (m_LogFlushPosition == LogCount) + { + return; + } + ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", + m_RootDirectory, + EntryCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + namespace fs = std::filesystem; + + fs::path IndexPath = GetIndexPath(m_RootDirectory); + fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); + + // Move index away, we keep it if something goes wrong + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } + if (fs::is_regular_file(IndexPath)) + { + fs::rename(IndexPath, STmpIndexPath); + } + + try + { + // Write the current state of the location map to a new index state + std::vector<FileCasIndexEntry> Entries; + + { + Entries.resize(m_Index.size()); + + uint64_t EntryIndex = 0; + for (auto& Entry : m_Index) + { + FileCasIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Size = Entry.second.Size; + } + } + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); + filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount}; + + Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header); + + ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0); + ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader)); + ObjectIndexFile.Flush(); + ObjectIndexFile.Close(); + EntryCount = Entries.size(); + m_LogFlushPosition = LogCount; + } + catch (std::exception& Err) + { + ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); + + // Restore any previous snapshot + + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(IndexPath); + fs::rename(STmpIndexPath, IndexPath); + } + } + if (fs::is_regular_file(STmpIndexPath)) + { + fs::remove(STmpIndexPath); + } +} +uint64_t +FileCasStrategy::ReadIndexFile() +{ + using namespace filecas::impl; + + std::vector<FileCasIndexEntry> Entries; + std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); + if (std::filesystem::is_regular_file(IndexPath)) + { + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", + IndexPath, + Entries.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t Size = ObjectIndexFile.FileSize(); + if (Size >= sizeof(FileCasIndexHeader)) + { + uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry); + FileCasIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); + if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) && + (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount)) + { + Entries.resize(Header.EntryCount); + ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); + + std::string InvalidEntryReason; + for (const FileCasIndexEntry& Entry : Entries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); + } + + return Header.LogPosition; + } + else + { + ZEN_WARN("skipping invalid index file '{}'", IndexPath); + } + } + return 0; + } + + if (std::filesystem::is_directory(m_RootDirectory)) + { + ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); + TCasLogFile<FileCasIndexEntry> CasLog; + uint64_t TotalSize = 0; + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}", + m_RootDirectory, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + CasLog.GetLogCount(), + NiceBytes(TotalSize)); + }); + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + + std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); + CasLog.Open(LogPath, CasLogFile::Mode::kTruncate); + std::string InvalidEntryReason; + for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) + { + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason); + continue; + } + m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); + CasLog.Append(Entry); + } + + CasLog.Close(); + } + + return 0; +} + +uint64_t +FileCasStrategy::ReadLog(uint64_t SkipEntryCount) +{ + using namespace filecas::impl; + + std::filesystem::path LogPath = GetLogPath(m_RootDirectory); + if (std::filesystem::is_regular_file(LogPath)) + { + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + TCasLogFile<FileCasIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (CasLog.Initialize()) + { + uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + LogEntryCount = EntryCount - SkipEntryCount; + m_Index.reserve(LogEntryCount); + uint64_t InvalidEntryCount = 0; + CasLog.Replay( + [&](const FileCasIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Flags & FileCasIndexEntry::kTombStone) + { + m_Index.erase(Record.Key); + return; + } + if (!ValidateEntry(Record, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; + } + m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size}); + }, + SkipEntryCount); + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath); + } + return LogEntryCount; + } + } + return 0; +} + +std::vector<FileCasStrategy::FileCasIndexEntry> +FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) +{ + using namespace filecas::impl; + + std::vector<FileCasIndexEntry> Entries; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {} + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + { + std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); + + std::filesystem::path::string_type PathString = RelPath.native(); + + if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) + { + if (PathString.at(3) == std::filesystem::path::preferred_separator) + { + PathString.erase(3, 1); + } + PathString.append(File); + + // TODO: should validate that we're actually dealing with a valid hex string here +#if ZEN_PLATFORM_WINDOWS + StringBuilder<64> Utf8; + WideToUtf8(PathString, Utf8); + IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); +#else + IoHash NameHash = IoHash::FromHexString(PathString); +#endif + Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize}); + } + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& DirectoryName) override + { + return true; + } + + const std::filesystem::path& RootDirectory; + std::vector<FileCasIndexEntry>& Entries; + } CasVisitor{RootDir, Entries}; + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(RootDir, CasVisitor); + return Entries; +}; + + ////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +TEST_CASE("cas.file.move") +{ + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + GcManager Gc; + + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); + + { + std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; + + IoBuffer ZeroBytes{1024 * 1024}; + IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes); + + BasicFile PayloadFile; + PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate); + PayloadFile.Write(ZeroBytes, 0); + PayloadFile.Close(); + + IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); + + CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash); + CHECK_EQ(Result.New, true); + } + +# if 0 + SUBCASE("stresstest") + { + std::vector<IoHash> PayloadHashes; + + const int kWorkers = 64; + const int kItemCount = 128; + + for (int w = 0; w < kWorkers; ++w) + { + for (int i = 0; i < kItemCount; ++i) + { + IoBuffer Payload{1024}; + *reinterpret_cast<int*>(Payload.MutableData()) = i; + PayloadHashes.push_back(IoHash::HashBuffer(Payload)); + + std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; + WriteFile(PayloadPath, Payload); + } + } + + std::barrier Sync{kWorkers}; + + auto PopulateAll = [&](int w) { + std::vector<IoBuffer> Buffers; + + for (int i = 0; i < kItemCount; ++i) + { + std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; + IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath); + Buffers.push_back(Payload); + Sync.arrive_and_wait(); + CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]); + } + }; + + std::vector<std::jthread> Threads; + + for (int i = 0; i < kWorkers; ++i) + { + Threads.push_back(std::jthread(PopulateAll, i)); + } + + for (std::jthread& Thread : Threads) + { + Thread.join(); + } + } +# endif +} + +TEST_CASE("cas.file.gc") +{ + // specifying an absolute path here can be helpful when using procmon to dig into things + ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; + + GcManager Gc; + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); + + const int kIterationCount = 1000; + std::vector<IoHash> Keys{kIterationCount}; + + auto InsertChunks = [&] { + for (int i = 0; i < kIterationCount; ++i) + { + CbObjectWriter Cbo; + Cbo << "id" << i; + CbObject Obj = Cbo.Save(); + + IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); + IoHash Hash = HashBuffer(ObjBuffer); + + FileCas.InsertChunk(ObjBuffer, Hash); + + Keys[i] = Hash; + } + }; + + // Drop everything + + { + InsertChunks(); + + GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); + FileCas.CollectGarbage(Ctx); + + for (const IoHash& Key : Keys) + { + IoBuffer Chunk = FileCas.FindChunk(Key); + + CHECK(!Chunk); + } + } + + // Keep roughly half of the chunks + + { + InsertChunks(); + + GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); + + for (const IoHash& Key : Keys) + { + if (Key.Hash[0] & 1) + { + Ctx.AddRetainedCids(std::vector<IoHash>{Key}); + } + } + + FileCas.CollectGarbage(Ctx); + + for (const IoHash& Key : Keys) + { + if (Key.Hash[0] & 1) + { + CHECK(FileCas.FindChunk(Key)); + } + else + { + CHECK(!FileCas.FindChunk(Key)); + } + } + } +} + +#endif + +void +filecas_forcelink() +{ +} + +} // namespace zen |