diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenstore/filecas.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenstore/filecas.cpp')
| -rw-r--r-- | zenstore/filecas.cpp | 1452 |
1 files changed, 0 insertions, 1452 deletions
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp deleted file mode 100644 index 1d25920c4..000000000 --- a/zenstore/filecas.cpp +++ /dev/null @@ -1,1452 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "filecas.h" - -#include <zencore/compress.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/memory.h> -#include <zencore/scopeguard.h> -#include <zencore/string.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/thread.h> -#include <zencore/timer.h> -#include <zencore/uid.h> -#include <zenstore/gc.h> -#include <zenstore/scrubcontext.h> -#include <zenutil/basicfile.h> - -#if ZEN_WITH_TESTS -# include <zencore/compactbinarybuilder.h> -#endif - -#include <gsl/gsl-lite.hpp> - -#include <barrier> -#include <filesystem> -#include <functional> -#include <unordered_map> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <xxhash.h> -#if ZEN_PLATFORM_WINDOWS -# include <atlfile.h> -#endif -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -namespace filecas::impl { - const char* IndexExtension = ".uidx"; - const char* LogExtension = ".ulog"; - - std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); } - - std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir) - { - return RootDir / fmt::format("cas.tmp{}", IndexExtension); - } - - std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); } - -#pragma pack(push) -#pragma pack(1) - - struct FileCasIndexHeader - { - static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; - static constexpr uint32_t CurrentVersion = 1; - - uint32_t Magic = ExpectedMagic; - uint32_t Version = CurrentVersion; - uint64_t EntryCount = 0; - uint64_t LogPosition = 0; - uint32_t Reserved = 0; - uint32_t Checksum = 0; - - static uint32_t ComputeChecksum(const FileCasIndexHeader& Header) - { - return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); - } - }; - - static_assert(sizeof(FileCasIndexHeader) == 32); - -#pragma pack(pop) - -} // namespace filecas::impl - -FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) -{ - ShardedPath.Append(RootPath.c_str()); - - ExtendableStringBuilder<64> HashString; - ChunkHash.ToHexString(HashString); - - const char* str = HashString.c_str(); - - // Shard into a path with two directory levels containing 12 bits and 8 bits - // respectively. - // - // This results in a maximum of 4096 * 256 directories - // - // The numbers have been chosen somewhat arbitrarily but are large to scale - // to very large chunk repositories without creating too many directories - // on a single level since NTFS does not deal very well with this. - // - // It may or may not make sense to make this a configurable policy, and it - // would probably be a good idea to measure performance for different - // policies and chunk counts - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str, str + 3); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 3, str + 5); - Shard2len = ShardedPath.Size(); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 5, str + 40); -} - -////////////////////////////////////////////////////////////////////////// - -FileCasStrategy::FileCasStrategy(GcManager& Gc) : GcStorage(Gc), m_Log(logging::Get("filecas")) -{ -} - -FileCasStrategy::~FileCasStrategy() -{ -} - -void -FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) -{ - using namespace filecas::impl; - - m_IsInitialized = true; - - m_RootDirectory = RootDirectory; - - m_Index.clear(); - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory); - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); - - if (IsNewStore) - { - std::filesystem::remove(LogPath); - std::filesystem::remove(IndexPath); - - if (std::filesystem::is_directory(m_RootDirectory)) - { - // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside - // in this folder as well - struct Visitor : public FileSystemTraversal::TreeVisitor - { - virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override - { - // We don't care about files - } - static bool IsHexChar(std::filesystem::path::value_type C) - { - return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16]; - } - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& DirectoryName) override - { - if (DirectoryName.length() == 3) - { - if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2])) - { - ShardedRoots.push_back(Parent / DirectoryName); - } - } - return false; - } - std::vector<std::filesystem::path> ShardedRoots; - } CasVisitor; - - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); - for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots) - { - std::filesystem::remove_all(SharededRoot); - } - } - } - - m_LogFlushPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); - for (const auto& Entry : m_Index) - { - m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); - } - - CreateDirectories(m_RootDirectory); - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - - if (IsNewStore || LogEntryCount > 0) - { - MakeIndexSnapshot(); - } -} - -#if ZEN_PLATFORM_WINDOWS -static void -DeletePayloadFileOnClose(const void* FileHandle) -{ - const HANDLE WinFileHandle = (const HANDLE)FileHandle; - // This will cause the file to be deleted when the last handle to it is closed - FILE_DISPOSITION_INFO Fdi{}; - Fdi.DeleteFile = TRUE; - BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); - - if (!Success) - { - // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed - // to delete it. - ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", - PathFromHandle(WinFileHandle), - GetLastErrorAsString()); - } -} -#endif - -CasStore::InsertResult -FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) -{ - ZEN_ASSERT(m_IsInitialized); - -#if !ZEN_WITH_TESTS - ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); -#endif - - if (Mode == CasStore::InsertMode::kCopyOnly) - { - { - RwLock::SharedLockScope _(m_Lock); - if (m_Index.contains(ChunkHash)) - { - return CasStore::InsertResult{.New = false}; - } - } - return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); - } - - // File-based chunks have special case handling whereby we move the file into - // place in the file store directory, thus avoiding unnecessary copying - - IoBufferFileReference FileRef; - if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) - { - { - bool Exists = true; - { - RwLock::SharedLockScope _(m_Lock); - Exists = m_Index.contains(ChunkHash); - } - if (Exists) - { -#if ZEN_PLATFORM_WINDOWS - DeletePayloadFileOnClose(FileRef.FileHandle); -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); - if (unlink(FilePath.c_str()) < 0) - { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) - { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - FilePath.string(), - GetSystemErrorAsString(UnlinkError)); - } - } -#endif - return CasStore::InsertResult{.New = false}; - } - } - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); - -#if ZEN_PLATFORM_WINDOWS - const HANDLE ChunkFileHandle = FileRef.FileHandle; - // See if file already exists - { - CAtlFile PayloadFile; - - if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) - { - // If we succeeded in opening the target file then we don't need to do anything else because it already exists - // and should contain the content we were about to insert - - // We do need to ensure the source file goes away on close, however - size_t ChunkSize = Chunk.GetSize(); - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) - { - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - - DeletePayloadFileOnClose(ChunkFileHandle); - - return CasStore::InsertResult{.New = IsNew}; - } - else - { - ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", - Name.ShardedPath.ToUtf8(), - ChunkSize, - FileSize); - } - } - else - { - if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) - { - // Shard directory does not exist - } - else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) - { - // Shard directory exists, but not the file - } - else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION)) - { - // Sharing violation, likely because we are trying to open a file - // which has been renamed on another thread, and the file handle - // used to rename it is still open. We handle this case below - // instead of here - } - else - { - ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes); - } - } - } - - std::filesystem::path FullPath(Name.ShardedPath.c_str()); - - std::filesystem::path FilePath = FullPath.parent_path(); - std::wstring FileName = FullPath.native(); - - const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR)); - FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize)); - memset(RenameInfo, 0, BufferSize); - - RenameInfo->ReplaceIfExists = FALSE; - RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size()); - memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); - RenameInfo->FileName[FileName.size()] = 0; - - auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); - - // Try to move file into place - BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); - - if (!Success) - { - // The rename/move could fail because the target directory does not yet exist. This code attempts - // to create it - - CAtlFile DirHandle; - - auto InternalCreateDirectoryHandle = [&] { - return DirHandle.Create(FilePath.c_str(), - GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - OPEN_EXISTING, - FILE_FLAG_BACKUP_SEMANTICS); - }; - - // It's possible for several threads to enter this logic trying to create the same - // directory. Only one will create the directory of course, but all threads will - // make it through okay - - HRESULT hRes = InternalCreateDirectoryHandle(); - - if (FAILED(hRes)) - { - // TODO: we can handle directory creation more intelligently and efficiently than - // this currently does - - CreateDirectories(FilePath.c_str()); - - hRes = InternalCreateDirectoryHandle(); - } - - if (FAILED(hRes)) - { - ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath)); - } - - // Retry rename/move - - Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); - } - - if (Success) - { - m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); - - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - - return CasStore::InsertResult{.New = IsNew}; - } - - const DWORD LastError = GetLastError(); - - if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) - { - HashLock.ReleaseNow(); - DeletePayloadFileOnClose(ChunkFileHandle); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - - return CasStore::InsertResult{.New = IsNew}; - } - - ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", - GetSystemErrorAsString(LastError), - ChunkHash); - - DeletePayloadFileOnClose(ChunkFileHandle); - -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); - std::filesystem::path DestPath = Name.ShardedPath.c_str(); - int Ret = link(SourcePath.c_str(), DestPath.c_str()); - if (Ret < 0 && zen::GetLastError() == ENOENT) - { - // Destination directory doesn't exist. Create it any try again. - CreateDirectories(DestPath.parent_path().c_str()); - Ret = link(SourcePath.c_str(), DestPath.c_str()); - } - int LinkError = zen::GetLastError(); - - if (unlink(SourcePath.c_str()) < 0) - { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) - { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - SourcePath.string(), - GetSystemErrorAsString(UnlinkError)); - } - } - - // It is possible that someone beat us to it in linking the file. In that - // case a "file exists" error is okay. All others are not. - if (Ret < 0) - { - if (LinkError == EEXIST) - { - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - - ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", - GetSystemErrorAsString(LinkError), - ChunkHash); - } - else - { - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } -#endif // ZEN_PLATFORM_* - } - - return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); -} - -CasStore::InsertResult -FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) -{ - ZEN_ASSERT(m_IsInitialized); - - { - RwLock::SharedLockScope _(m_Lock); - if (m_Index.contains(ChunkHash)) - { - return {.New = false}; - } - } - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - // See if file already exists - -#if ZEN_PLATFORM_WINDOWS - CAtlFile PayloadFile; - - HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - - if (SUCCEEDED(hRes)) - { - // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the - // content we were about to insert - - bool IsNew = false; - { - RwLock::ExclusiveLockScope _(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - - PayloadFile.Close(); -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - if (access(Name.ShardedPath.c_str(), F_OK) == 0) - { - return CasStore::InsertResult{.New = false}; - } -#endif - - RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); - -#if ZEN_PLATFORM_WINDOWS - // For now, use double-checked locking to see if someone else was first - - hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - - if (SUCCEEDED(hRes)) - { - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) - { - // If we succeeded in opening the file then and the size is correct we don't need to do anything - // else because someone else managed to create the file before we did. Just return. - - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - else - { - ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", - Name.ShardedPath.ToUtf8(), - ChunkSize, - FileSize); - } - } - - if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) - { - ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); - } - - auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; - - hRes = InternalCreateFile(); - - if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) - { - // Ensure parent directories exist and retry file creation - CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); - hRes = InternalCreateFile(); - } - - if (FAILED(hRes)) - { - ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8())); - } -#else - // Attempt to exclusively create the file. - auto InternalCreateFile = [&] { - int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); - if (Fd >= 0) - { - fchmod(Fd, 0666); - } - return Fd; - }; - int Fd = InternalCreateFile(); - if (Fd < 0) - { - switch (zen::GetLastError()) - { - case EEXIST: - // Another thread has beat us to it so we're golden. - { - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return {.New = IsNew}; - } - break; - - case ENOENT: - if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) - { - Fd = InternalCreateFile(); - if (Fd >= 0) - { - break; - } - } - ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); - - default: - ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); - } - } - - struct FdWrapper - { - ~FdWrapper() { Close(); } - void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); } - void Close() - { - if (Fd >= 0) - { - close(Fd); - Fd = -1; - } - } - int Fd; - } PayloadFile = {Fd}; -#endif // ZEN_PLATFORM_WINDOWS - - size_t ChunkRemain = ChunkSize; - auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData); - - while (ChunkRemain != 0) - { - uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain)); - - PayloadFile.Write(ChunkCursor, ByteCount); - - ChunkCursor += ByteCount; - ChunkRemain -= ByteCount; - } - - // We cannot rely on RAII to close the file handle since it would be closed - // *after* the lock is released due to the initialization order - PayloadFile.Close(); - - m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); - - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - - return {.New = IsNew}; -} - -IoBuffer -FileCasStrategy::FindChunk(const IoHash& ChunkHash) -{ - ZEN_ASSERT(m_IsInitialized); - - { - RwLock::SharedLockScope _(m_Lock); - if (!m_Index.contains(ChunkHash)) - { - return {}; - } - } - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - RwLock::SharedLockScope _(LockForHash(ChunkHash)); - - return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); -} - -bool -FileCasStrategy::HaveChunk(const IoHash& ChunkHash) -{ - ZEN_ASSERT(m_IsInitialized); - - RwLock::SharedLockScope _(m_Lock); - return m_Index.contains(ChunkHash); -} - -void -FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) -{ - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); - if (Ec) - { - ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); - FileSize = 0; - } - - ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize)); - std::filesystem::remove(Name.ShardedPath.c_str(), Ec); - - if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str())) - { - { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) - { - m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); - m_Index.erase(It); - } - } - m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); - } -} - -void -FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) -{ - ZEN_ASSERT(m_IsInitialized); - - // NOTE: it's not a problem now, but in the future if a GC should happen while this - // is in flight, the result could be wrong since chunks could go away in the meantime. - // - // It would be good to have a pinning mechanism to make this less likely but - // given that chunks could go away at any point after the results are returned to - // a caller, this is something which needs to be taken into account by anyone consuming - // this functionality in any case - - InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); -} - -void -FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback) -{ - ZEN_ASSERT(m_IsInitialized); - - RwLock::SharedLockScope _(m_Lock); - for (const auto& It : m_Index) - { - const IoHash& NameHash = It.first; - ShardingHelper Name(m_RootDirectory.c_str(), NameHash); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); - Callback(NameHash, std::move(Payload)); - } -} - -void -FileCasStrategy::Flush() -{ - // Since we don't keep files open after writing there's nothing specific - // to flush here. - // - // Depending on what semantics we want Flush() to provide, it could be - // argued that this should just flush the volume which we are using to - // store the CAS files on here, to ensure metadata is flushed along - // with file data - // - // Related: to facilitate more targeted validation during recovery we could - // maintain a log of when chunks were created -} - -void -FileCasStrategy::Scrub(ScrubContext& Ctx) -{ - ZEN_ASSERT(m_IsInitialized); - - std::vector<IoHash> BadHashes; - uint64_t ChunkCount{0}, ChunkBytes{0}; - - { - std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); - RwLock::ExclusiveLockScope _(m_Lock); - for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) - { - if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed); - m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); - } - } - } - - IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { - if (!Payload) - { - BadHashes.push_back(Hash); - return; - } - ++ChunkCount; - ChunkBytes += Payload.GetSize(); - - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(Payload, RawHash, RawSize)) - { - if (RawHash != Hash) - { - // Hash mismatch - BadHashes.push_back(Hash); - return; - } - return; - } -#if ZEN_WITH_TESTS - IoHash ComputedHash = IoHash::HashBuffer(CompositeBuffer(SharedBuffer(std::move(Payload)))); - if (ComputedHash == Hash) - { - return; - } -#endif - BadHashes.push_back(Hash); - }); - - Ctx.ReportScrubbed(ChunkCount, ChunkBytes); - - if (!BadHashes.empty()) - { - ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size()); - - if (Ctx.RunRecovery()) - { - ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); - - for (const IoHash& Hash : BadHashes) - { - std::error_code Ec; - DeleteChunk(Hash, Ec); - - if (Ec) - { - ZEN_WARN("failed to delete file for chunk {}", Hash); - } - } - } - } - - // Let whomever it concerns know about the bad chunks. This could - // be used to invalidate higher level data structures more efficiently - // than a full validation pass might be able to do - Ctx.ReportBadCidChunks(BadHashes); - - ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes)); -} - -void -FileCasStrategy::CollectGarbage(GcContext& GcCtx) -{ - ZEN_ASSERT(m_IsInitialized); - - ZEN_DEBUG("collecting garbage from {}", m_RootDirectory); - - std::vector<IoHash> ChunksToDelete; - std::atomic<uint64_t> ChunksToDeleteBytes{0}; - std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; - - std::vector<IoHash> CandidateCas; - CandidateCas.resize(1); - - uint64_t DeletedCount = 0; - uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); - - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", - m_RootDirectory, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - DeletedCount, - ChunkCount, - NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)), - NiceBytes(OldTotalSize)); - }); - - IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { - bool KeepThis = false; - CandidateCas[0] = Hash; - GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { - ZEN_UNUSED(Hash); - KeepThis = true; - }); - - const uint64_t FileSize = Payload.GetSize(); - - if (!KeepThis) - { - ChunksToDelete.push_back(Hash); - ChunksToDeleteBytes.fetch_add(FileSize); - } - - ++ChunkCount; - ChunkBytes.fetch_add(FileSize); - }); - - // TODO, any entires we did not encounter during our IterateChunks should be removed from the index - - if (ChunksToDelete.empty()) - { - ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); - return; - } - - ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})", - m_RootDirectory, - ChunksToDelete.size(), - ChunkCount.load(), - NiceBytes(ChunksToDeleteBytes)); - - if (GcCtx.IsDeletionMode() == false) - { - ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled"); - - return; - } - - for (const IoHash& Hash : ChunksToDelete) - { - ZEN_TRACE("deleting chunk {}", Hash); - - std::error_code Ec; - DeleteChunk(Hash, Ec); - - if (Ec) - { - ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); - continue; - } - DeletedCount++; - } - - GcCtx.AddDeletedCids(ChunksToDelete); -} - -bool -FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason) -{ - if (Entry.Key == IoHash::Zero) - { - OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); - return false; - } - if (Entry.Flags & (~FileCasIndexEntry::kTombStone)) - { - OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); - return false; - } - if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) - { - return true; - } - uint64_t Size = Entry.Size; - if (Size == 0) - { - OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); - return false; - } - return true; -} - -void -FileCasStrategy::MakeIndexSnapshot() -{ - using namespace filecas::impl; - - uint64_t LogCount = m_CasLog.GetLogCount(); - if (m_LogFlushPosition == LogCount) - { - return; - } - ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", - m_RootDirectory, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - namespace fs = std::filesystem; - - fs::path IndexPath = GetIndexPath(m_RootDirectory); - fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - try - { - // Write the current state of the location map to a new index state - std::vector<FileCasIndexEntry> Entries; - - { - Entries.resize(m_Index.size()); - - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - FileCasIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Size = Entry.second.Size; - } - } - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = LogCount}; - - Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header); - - ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_ERROR("snapshot FAILED, reason: '{}'", Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(IndexPath); - fs::rename(STmpIndexPath, IndexPath); - } - } - if (fs::is_regular_file(STmpIndexPath)) - { - fs::remove(STmpIndexPath); - } -} -uint64_t -FileCasStrategy::ReadIndexFile() -{ - using namespace filecas::impl; - - std::vector<FileCasIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); - if (std::filesystem::is_regular_file(IndexPath)) - { - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - Entries.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(FileCasIndexHeader)) - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry); - FileCasIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) && - (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount)) - { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); - - std::string InvalidEntryReason; - for (const FileCasIndexEntry& Entry : Entries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); - } - - return Header.LogPosition; - } - else - { - ZEN_WARN("skipping invalid index file '{}'", IndexPath); - } - } - return 0; - } - - if (std::filesystem::is_directory(m_RootDirectory)) - { - ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); - TCasLogFile<FileCasIndexEntry> CasLog; - uint64_t TotalSize = 0; - Stopwatch TotalTimer; - const auto _ = MakeGuard([&] { - ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}", - m_RootDirectory, - NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), - CasLog.GetLogCount(), - NiceBytes(TotalSize)); - }); - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory); - - std::vector<FileCasStrategy::FileCasIndexEntry> ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); - CasLog.Open(LogPath, CasLogFile::Mode::kTruncate); - std::string InvalidEntryReason; - for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) - { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason); - continue; - } - m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); - CasLog.Append(Entry); - } - - CasLog.Close(); - } - - return 0; -} - -uint64_t -FileCasStrategy::ReadLog(uint64_t SkipEntryCount) -{ - using namespace filecas::impl; - - std::filesystem::path LogPath = GetLogPath(m_RootDirectory); - if (std::filesystem::is_regular_file(LogPath)) - { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<FileCasIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - m_Index.reserve(LogEntryCount); - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const FileCasIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Flags & FileCasIndexEntry::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size}); - }, - SkipEntryCount); - if (InvalidEntryCount) - { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath); - } - return LogEntryCount; - } - } - return 0; -} - -std::vector<FileCasStrategy::FileCasIndexEntry> -FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) -{ - using namespace filecas::impl; - - std::vector<FileCasIndexEntry> Entries; - struct Visitor : public FileSystemTraversal::TreeVisitor - { - Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {} - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override - { - std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); - - std::filesystem::path::string_type PathString = RelPath.native(); - - if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) - { - if (PathString.at(3) == std::filesystem::path::preferred_separator) - { - PathString.erase(3, 1); - } - PathString.append(File); - - // TODO: should validate that we're actually dealing with a valid hex string here -#if ZEN_PLATFORM_WINDOWS - StringBuilder<64> Utf8; - WideToUtf8(PathString, Utf8); - IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); -#else - IoHash NameHash = IoHash::FromHexString(PathString); -#endif - Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize}); - } - } - - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& DirectoryName) override - { - return true; - } - - const std::filesystem::path& RootDirectory; - std::vector<FileCasIndexEntry>& Entries; - } CasVisitor{RootDir, Entries}; - - FileSystemTraversal Traversal; - Traversal.TraverseFileSystem(RootDir, CasVisitor); - return Entries; -}; - - ////////////////////////////////////////////////////////////////////////// - -#if ZEN_WITH_TESTS - -TEST_CASE("cas.file.move") -{ - // specifying an absolute path here can be helpful when using procmon to dig into things - ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - - GcManager Gc; - - FileCasStrategy FileCas(Gc); - FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); - - { - std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; - - IoBuffer ZeroBytes{1024 * 1024}; - IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes); - - BasicFile PayloadFile; - PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate); - PayloadFile.Write(ZeroBytes, 0); - PayloadFile.Close(); - - IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); - - CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash); - CHECK_EQ(Result.New, true); - } - -# if 0 - SUBCASE("stresstest") - { - std::vector<IoHash> PayloadHashes; - - const int kWorkers = 64; - const int kItemCount = 128; - - for (int w = 0; w < kWorkers; ++w) - { - for (int i = 0; i < kItemCount; ++i) - { - IoBuffer Payload{1024}; - *reinterpret_cast<int*>(Payload.MutableData()) = i; - PayloadHashes.push_back(IoHash::HashBuffer(Payload)); - - std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; - WriteFile(PayloadPath, Payload); - } - } - - std::barrier Sync{kWorkers}; - - auto PopulateAll = [&](int w) { - std::vector<IoBuffer> Buffers; - - for (int i = 0; i < kItemCount; ++i) - { - std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; - IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath); - Buffers.push_back(Payload); - Sync.arrive_and_wait(); - CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]); - } - }; - - std::vector<std::jthread> Threads; - - for (int i = 0; i < kWorkers; ++i) - { - Threads.push_back(std::jthread(PopulateAll, i)); - } - - for (std::jthread& Thread : Threads) - { - Thread.join(); - } - } -# endif -} - -TEST_CASE("cas.file.gc") -{ - // specifying an absolute path here can be helpful when using procmon to dig into things - ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; - - GcManager Gc; - FileCasStrategy FileCas(Gc); - FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); - - const int kIterationCount = 1000; - std::vector<IoHash> Keys{kIterationCount}; - - auto InsertChunks = [&] { - for (int i = 0; i < kIterationCount; ++i) - { - CbObjectWriter Cbo; - Cbo << "id" << i; - CbObject Obj = Cbo.Save(); - - IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); - IoHash Hash = HashBuffer(ObjBuffer); - - FileCas.InsertChunk(ObjBuffer, Hash); - - Keys[i] = Hash; - } - }; - - // Drop everything - - { - InsertChunks(); - - GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); - FileCas.CollectGarbage(Ctx); - - for (const IoHash& Key : Keys) - { - IoBuffer Chunk = FileCas.FindChunk(Key); - - CHECK(!Chunk); - } - } - - // Keep roughly half of the chunks - - { - InsertChunks(); - - GcContext Ctx(GcClock::Now() - std::chrono::hours(24)); - - for (const IoHash& Key : Keys) - { - if (Key.Hash[0] & 1) - { - Ctx.AddRetainedCids(std::vector<IoHash>{Key}); - } - } - - FileCas.CollectGarbage(Ctx); - - for (const IoHash& Key : Keys) - { - if (Key.Hash[0] & 1) - { - CHECK(FileCas.FindChunk(Key)); - } - else - { - CHECK(!FileCas.FindChunk(Key)); - } - } - } -} - -#endif - -void -filecas_forcelink() -{ -} - -} // namespace zen |