// Copyright Epic Games, Inc. All Rights Reserved. #include "filecas.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include #endif #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #if ZEN_PLATFORM_WINDOWS # include #else # include # include # include #endif ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { template void Reset(T& V) { T Tmp; V.swap(Tmp); } } // namespace namespace filecas::impl { const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas.tmp{}", IndexExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); } #pragma pack(push) #pragma pack(1) struct FileCasIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t CurrentVersion = 1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t Reserved = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const FileCasIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(FileCasIndexHeader) == 32); #pragma pack(pop) } // namespace filecas::impl FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) { ShardedPath.Append(RootPath.c_str()); ExtendableStringBuilder<64> HashString; ChunkHash.ToHexString(HashString); const char* str = HashString.c_str(); // Shard into a path with two directory levels containing 12 bits and 8 bits // respectively. // // This results in a maximum of 4096 * 256 directories // // The numbers have been chosen somewhat arbitrarily but are large to scale // to very large chunk repositories without creating too many directories // on a single level since NTFS does not deal very well with this. // // It may or may not make sense to make this a configurable policy, and it // would probably be a good idea to measure performance for different // policies and chunk counts ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str, str + 3); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 3, str + 5); Shard2len = ShardedPath.Size(); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 5, str + 40); } ////////////////////////////////////////////////////////////////////////// static const float IndexMinLoadFactor = 0.2f; static const float IndexMaxLoadFactor = 0.7f; FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc) { m_Index.min_load_factor(IndexMinLoadFactor); m_Index.max_load_factor(IndexMaxLoadFactor); m_Gc.AddGcStorage(this); m_Gc.AddGcReferenceStore(*this); } FileCasStrategy::~FileCasStrategy() { m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } void FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) { ZEN_TRACE_CPU("FileCas::Initialize"); using namespace filecas::impl; m_IsInitialized = true; m_RootDirectory = RootDirectory; m_Index.clear(); std::filesystem::path LogPath = GetLogPath(m_RootDirectory); std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); if (IsNewStore) { std::filesystem::remove(LogPath); std::filesystem::remove(IndexPath); if (std::filesystem::is_directory(m_RootDirectory)) { // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside // in this folder as well struct Visitor : public FileSystemTraversal::TreeVisitor { virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override { // We don't care about files } static bool IsHexChar(std::filesystem::path::value_type C) { return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16]; } virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, [[maybe_unused]] const path_view& DirectoryName) override { if (DirectoryName.length() == 3) { if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2])) { ShardedRoots.push_back(Parent / DirectoryName); } } return false; } std::vector ShardedRoots; } CasVisitor; FileSystemTraversal Traversal; Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots) { std::filesystem::remove_all(SharededRoot); } } } if (std::filesystem::is_regular_file(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); std::filesystem::remove(IndexPath); } } uint64_t LogEntryCount = 0; if (std::filesystem::is_regular_file(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); } else { ZEN_WARN("removing invalid cas log at '{}'", LogPath); std::filesystem::remove(LogPath); } } for (const auto& Entry : m_Index) { m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); } CreateDirectories(m_RootDirectory); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); if (IsNewStore || LogEntryCount > 0) { MakeIndexSnapshot(); } } CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { ZEN_TRACE_CPU("FileCas::InsertChunk"); ZEN_ASSERT(m_IsInitialized); #if !ZEN_WITH_TESTS ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif if (Mode == CasStore::InsertMode::kCopyOnly) { { RwLock::SharedLockScope _(m_Lock); if (m_Index.contains(ChunkHash)) { return CasStore::InsertResult{.New = false}; } } return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash); } // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying IoBufferFileReference FileRef; bool IsWholeFile = Chunk.IsWholeFile(); if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef)) { ZEN_TRACE_CPU("FileCas::InsertChunk::Move"); { bool Exists = true; { RwLock::SharedLockScope _(m_Lock); Exists = m_Index.contains(ChunkHash); } if (Exists) { return CasStore::InsertResult{.New = false}; } } ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); #if ZEN_PLATFORM_WINDOWS const HANDLE ChunkFileHandle = FileRef.FileHandle; // See if file already exists { ZEN_TRACE_CPU("FileCas::InsertChunk::Exists"); windows::FileHandle PayloadFile; if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) { // If we succeeded in opening the target file then we don't need to do anything else because it already exists // and should contain the content we were about to insert // We do need to ensure the source file goes away on close, however size_t ChunkSize = Chunk.GetSize(); uint64_t FileSize = 0; if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) { HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order::relaxed); } return CasStore::InsertResult{.New = IsNew}; } else { ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", Name.ShardedPath.ToUtf8(), ChunkSize, FileSize); } } else { if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) { // Shard directory does not exist } else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) { // Shard directory exists, but not the file } else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION)) { // Sharing violation, likely because we are trying to open a file // which has been renamed on another thread, and the file handle // used to rename it is still open. We handle this case below // instead of here } else { ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes); } } } std::filesystem::path FullPath(Name.ShardedPath.c_str()); std::filesystem::path FilePath = FullPath.parent_path(); std::wstring FileName = FullPath.native(); const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow(FileName.size() * sizeof(WCHAR)); FILE_RENAME_INFO* RenameInfo = reinterpret_cast(Memory::Alloc(BufferSize)); memset(RenameInfo, 0, BufferSize); RenameInfo->ReplaceIfExists = FALSE; RenameInfo->FileNameLength = gsl::narrow(FileName.size()); memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); RenameInfo->FileName[FileName.size()] = 0; auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); // Try to move file into place BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); if (!Success) { // The rename/move could fail because the target directory does not yet exist. This code attempts // to create it windows::FileHandle DirHandle; auto InternalCreateDirectoryHandle = [&] { return DirHandle.Create(FilePath.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS); }; // It's possible for several threads to enter this logic trying to create the same // directory. Only one will create the directory of course, but all threads will // make it through okay HRESULT hRes = InternalCreateDirectoryHandle(); if (FAILED(hRes)) { // TODO: we can handle directory creation more intelligently and efficiently than // this currently does CreateDirectories(FilePath.c_str()); hRes = InternalCreateDirectoryHandle(); } if (FAILED(hRes)) { ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath)); } // Retry rename/move Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); } if (Success) { m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); Chunk.SetDeleteOnClose(false); HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } return CasStore::InsertResult{.New = IsNew}; } const DWORD LastError = GetLastError(); if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) { HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } return CasStore::InsertResult{.New = IsNew}; } ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", GetSystemErrorAsString(LastError), ChunkHash); #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); std::filesystem::path DestPath = Name.ShardedPath.c_str(); int Ret = link(SourcePath.c_str(), DestPath.c_str()); if (Ret < 0 && zen::GetLastError() == ENOENT) { // Destination directory doesn't exist. Create it any try again. CreateDirectories(DestPath.parent_path().c_str()); Ret = link(SourcePath.c_str(), DestPath.c_str()); } if (Ret == 0) { m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); Chunk.SetDeleteOnClose(false); HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } return CasStore::InsertResult{.New = IsNew}; } else { int LinkError = zen::GetLastError(); // It is possible that someone beat us to it in linking the file. In that // case a "file exists" error is okay. All others are not. if (LinkError == EEXIST) { HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; } if (IsNew) { m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } return CasStore::InsertResult{.New = IsNew}; } ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", GetSystemErrorAsString(LinkError), ChunkHash); } #endif // ZEN_PLATFORM_* } return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash); } CasStore::InsertResult FileCasStrategy::InsertChunkData(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) { ZEN_TRACE_CPU("FileCas::InsertChunkData"); ZEN_ASSERT(m_IsInitialized); { RwLock::SharedLockScope _(m_Lock); if (m_Index.contains(ChunkHash)) { return {.New = false}; } } ZEN_TRACE_CPU("FileCas::InsertChunkData::Write"); ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); // See if file already exists #if ZEN_PLATFORM_WINDOWS windows::FileHandle PayloadFile; HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the // content we were about to insert bool IsNew = false; { RwLock::ExclusiveLockScope _(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order::relaxed); } return CasStore::InsertResult{.New = IsNew}; } PayloadFile.Close(); #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC if (access(Name.ShardedPath.c_str(), F_OK) == 0) { return CasStore::InsertResult{.New = false}; } #endif RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); #if ZEN_PLATFORM_WINDOWS // For now, use double-checked locking to see if someone else was first hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { uint64_t FileSize = 0; if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) { // If we succeeded in opening the file then and the size is correct we don't need to do anything // else because someone else managed to create the file before we did. Just return. HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order::relaxed); } return CasStore::InsertResult{.New = IsNew}; } else { ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", Name.ShardedPath.ToUtf8(), ChunkSize, FileSize); } } if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) { ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); } auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; hRes = InternalCreateFile(); if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) { // Ensure parent directories exist and retry file creation CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); hRes = InternalCreateFile(); } if (FAILED(hRes)) { ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8())); } #else // Attempt to exclusively create the file. auto InternalCreateFile = [&] { int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); if (Fd >= 0) { fchmod(Fd, 0666); } return Fd; }; int Fd = InternalCreateFile(); if (Fd < 0) { switch (zen::GetLastError()) { case EEXIST: // Another thread has beat us to it so we're golden. { HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order::relaxed); } return {.New = IsNew}; } break; case ENOENT: if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) { Fd = InternalCreateFile(); if (Fd >= 0) { break; } } ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); default: ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); } } struct FdWrapper { ~FdWrapper() { Close(); } void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); } void Close() { if (Fd >= 0) { close(Fd); Fd = -1; } } int Fd; } PayloadFile = {Fd}; #endif // ZEN_PLATFORM_WINDOWS size_t ChunkRemain = ChunkSize; auto ChunkCursor = reinterpret_cast(ChunkData); while (ChunkRemain != 0) { uint32_t ByteCount = uint32_t(std::min(4 * 1024 * 1024ull, ChunkRemain)); PayloadFile.Write(ChunkCursor, ByteCount); ChunkCursor += ByteCount; ChunkRemain -= ByteCount; } // We cannot rely on RAII to close the file handle since it would be closed // *after* the lock is released due to the initialization order PayloadFile.Close(); m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); HashLock.ReleaseNow(); bool IsNew = false; { RwLock::ExclusiveLockScope __(m_Lock); IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; } if (IsNew) { m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order::relaxed); } return {.New = IsNew}; } IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("FileCas::FindChunk"); ZEN_ASSERT(m_IsInitialized); uint64_t ExpectedSize = 0; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { ExpectedSize = It->second.Size; } else { return {}; } } ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); Chunk.GetSize() == ExpectedSize) { return Chunk; } return {}; } bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); RwLock::SharedLockScope _(m_Lock); return m_Index.contains(ChunkHash); } void FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { ZEN_TRACE_CPU("FileCas::DeleteChunk"); ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); uint64_t FileSize = static_cast(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); if (Ec) { ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); FileSize = 0; } ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize)); std::filesystem::remove(Name.ShardedPath.c_str(), Ec); if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str())) { { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); m_Index.erase(It); } } m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); } } void FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) { ZEN_ASSERT(m_IsInitialized); // NOTE: it's not a problem now, but in the future if a GC should happen while this // is in flight, the result could be wrong since chunks could go away in the meantime. // // It would be good to have a pinning mechanism to make this less likely but // given that chunks could go away at any point after the results are returned to // a caller, this is something which needs to be taken into account by anyone consuming // this functionality in any case InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void FileCasStrategy::IterateChunks(std::function&& Callback) { ZEN_TRACE_CPU("FileCas::IterateChunks"); ZEN_ASSERT(m_IsInitialized); RwLock::SharedLockScope _(m_Lock); for (const auto& It : m_Index) { const IoHash& NameHash = It.first; ShardingHelper Name(m_RootDirectory.c_str(), NameHash); IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); Callback(NameHash, std::move(Payload)); } } void FileCasStrategy::IterateChunks(std::function&& Callback) { ZEN_TRACE_CPU("FileCas::IterateChunks"); ZEN_ASSERT(m_IsInitialized); RwLock::SharedLockScope _(m_Lock); for (const auto& It : m_Index) { const IoHash& NameHash = It.first; Callback(NameHash, It.second.Size); } } void FileCasStrategy::Flush() { ZEN_TRACE_CPU("FileCas::Flush"); m_CasLog.Flush(); MakeIndexSnapshot(); } void FileCasStrategy::ScrubStorage(ScrubContext& Ctx) { ZEN_TRACE_CPU("FileCas::ScrubStorage"); if (Ctx.IsSkipCas()) { ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory); return; } Stopwatch Timer; ZEN_INFO("scrubbing file CAS @ '{}'", m_RootDirectory); ZEN_ASSERT(m_IsInitialized); std::vector BadHashes; uint64_t ChunkCount{0}, ChunkBytes{0}; int DiscoveredFilesNotInIndex = 0; { std::vector ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); RwLock::ExclusiveLockScope _(m_Lock); for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) { if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second) { m_TotalSize.fetch_add(static_cast(Entry.Size), std::memory_order::relaxed); m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); ++DiscoveredFilesNotInIndex; } } } ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex); IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { if (!Payload) { BadHashes.push_back(Hash); return; } ++ChunkCount; ChunkBytes += Payload.GetSize(); IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload); IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize)) { if (RawHash == Hash) { // Header hash matches the file name, full validation requires that // we check that the decompressed data hash also matches CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer)); OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize; if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { if (BlockSize == 0) { BlockSize = 256 * 1024; } else if (BlockSize < (1024 * 1024)) { BlockSize = BlockSize * (1024 * 1024 / BlockSize); } std::unique_ptr DecompressionBuffer(new uint8_t[BlockSize]); IoHashStream Hasher; uint64_t RawOffset = 0; while (RawSize) { const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize); bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize), RawOffset); if (Ok) { Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize); } RawSize -= DecompressedBlockSize; RawOffset += DecompressedBlockSize; } const IoHash FinalHash = Hasher.GetHash(); if (FinalHash == Hash) { // all good return; } } } } BadHashes.push_back(Hash); }); Ctx.ReportScrubbed(ChunkCount, ChunkBytes); if (!BadHashes.empty()) { ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory); if (Ctx.RunRecovery()) { ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); for (const IoHash& Hash : BadHashes) { std::error_code Ec; DeleteChunk(Hash, Ec); if (Ec) { ZEN_WARN("failed to delete file for chunk {}: {}", Hash, Ec.message()); } } } else { ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size()); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadCidChunks(BadHashes); ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}", m_RootDirectory, ChunkCount, NiceBytes(ChunkBytes), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } void FileCasStrategy::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("FileCas::CollectGarbage"); ZEN_ASSERT(m_IsInitialized); if (GcCtx.SkipCid()) { return; } ZEN_DEBUG("collecting garbage from {}", m_RootDirectory); std::vector ChunksToDelete; std::atomic ChunksToDeleteBytes{0}; std::atomic ChunkCount{0}, ChunkBytes{0}; std::vector CandidateCas; CandidateCas.resize(1); uint64_t DeletedCount = 0; uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_DEBUG("garbage collect for '{}' DONE after {}, deleted {} out of {} files, removed {} out of {}", m_RootDirectory, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), DeletedCount, ChunkCount.load(), NiceBytes(OldTotalSize - m_TotalSize.load(std::memory_order::relaxed)), NiceBytes(OldTotalSize)); }); { ZEN_TRACE_CPU("FileCas::CollectGarbage::Filter"); IterateChunks([&](const IoHash& Hash, uint64_t Size) { bool KeepThis = false; CandidateCas[0] = Hash; GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { ZEN_UNUSED(Hash); KeepThis = true; }); if (!KeepThis) { ChunksToDelete.push_back(Hash); ChunksToDeleteBytes.fetch_add(Size); } ++ChunkCount; ChunkBytes.fetch_add(Size); }); } // TODO, any entires we did not encounter during our IterateChunks should be removed from the index if (ChunksToDelete.empty()) { ZEN_DEBUG("gc for '{}' SKIPPED, nothing to delete", m_RootDirectory); return; } ZEN_DEBUG("deleting file CAS garbage for '{}': {} out of {} chunks ({})", m_RootDirectory, ChunksToDelete.size(), ChunkCount.load(), NiceBytes(ChunksToDeleteBytes)); if (GcCtx.IsDeletionMode() == false) { ZEN_DEBUG("NOTE: not actually deleting anything since deletion is disabled"); return; } for (const IoHash& Hash : ChunksToDelete) { ZEN_TRACE("deleting chunk {}", Hash); std::error_code Ec; DeleteChunk(Hash, Ec); if (Ec) { ZEN_WARN("gc for '{}' failed to delete file for chunk {}: '{}'", m_RootDirectory, Hash, Ec.message()); continue; } DeletedCount++; } GcCtx.AddDeletedCids(ChunksToDelete); } GcStorageSize FileCasStrategy::StorageSize() const { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } bool FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Flags & (~FileCasIndexEntry::kTombStone)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); return false; } if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) { return true; } uint64_t Size = Entry.Size; if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } void FileCasStrategy::MakeIndexSnapshot() { ZEN_TRACE_CPU("FileCas::MakeIndexSnapshot"); using namespace filecas::impl; uint64_t LogCount = m_CasLog.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_RootDirectory, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_RootDirectory); fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); // Move index away, we keep it if something goes wrong if (fs::is_regular_file(STmpIndexPath)) { std::error_code Ec; if (!fs::remove(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); return; } } try { if (fs::is_regular_file(IndexPath)) { fs::rename(IndexPath, STmpIndexPath); } // Write the current state of the location map to a new index state std::vector Entries; uint64_t IndexLogPosition = 0; { RwLock::SharedLockScope __(m_Lock); IndexLogPosition = m_CasLog.GetLogCount(); Entries.resize(m_Index.size()); uint64_t EntryIndex = 0; for (const auto& Entry : m_Index) { FileCasIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Size = Entry.second.Size; } } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); } filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = IndexLogPosition}; Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader)); ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath)); } EntryCount = Entries.size(); m_LogFlushPosition = IndexLogPosition; } catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); // Restore any previous snapshot if (fs::is_regular_file(STmpIndexPath)) { std::error_code Ec; fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless fs::rename(STmpIndexPath, IndexPath, Ec); if (Ec) { ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); } } } if (fs::is_regular_file(STmpIndexPath)) { std::error_code Ec; if (!fs::remove(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); } } } uint64_t FileCasStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("FileCas::ReadIndexFile"); using namespace filecas::impl; std::vector Entries; if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(FileCasIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry); FileCasIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) && (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount)) { Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); std::string InvalidEntryReason; for (const FileCasIndexEntry& Entry : Entries) { if (!ValidateEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); } OutVersion = FileCasIndexHeader::CurrentVersion; return Header.LogPosition; } else { ZEN_WARN("skipping invalid index file '{}'", IndexPath); } } return 0; } if (std::filesystem::is_directory(m_RootDirectory)) { ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); TCasLogFile CasLog; uint64_t TotalSize = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}", m_RootDirectory, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), CasLog.GetLogCount(), NiceBytes(TotalSize)); }); std::filesystem::path LogPath = GetLogPath(m_RootDirectory); std::vector ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); CasLog.Open(LogPath, CasLogFile::Mode::kTruncate); std::string InvalidEntryReason; for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) { m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); TotalSize += Entry.Size; CasLog.Append(Entry); } CasLog.Close(); } return 0; } uint64_t FileCasStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("FileCas::ReadLog"); using namespace filecas::impl; if (std::filesystem::is_regular_file(LogPath)) { uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; m_Index.reserve(LogEntryCount); uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const FileCasIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Flags & FileCasIndexEntry::kTombStone) { m_Index.erase(Record.Key); return; } if (!ValidateEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size}); }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath); } return LogEntryCount; } } return 0; } std::vector FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) { ZEN_TRACE_CPU("FileCas::ScanFolderForCasFiles"); using namespace filecas::impl; std::vector Entries; struct Visitor : public FileSystemTraversal::TreeVisitor { Visitor(const std::filesystem::path& RootDir, std::vector& Entries) : RootDirectory(RootDir), Entries(Entries) {} virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override { std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); std::filesystem::path::string_type PathString = RelPath.native(); if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) { if (PathString.at(3) == std::filesystem::path::preferred_separator) { PathString.erase(3, 1); } PathString.append(File); // TODO: should validate that we're actually dealing with a valid hex string here #if ZEN_PLATFORM_WINDOWS StringBuilder<64> Utf8; WideToUtf8(PathString, Utf8); IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); #else IoHash NameHash = IoHash::FromHexString(PathString); #endif Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize}); } } virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, [[maybe_unused]] const path_view& DirectoryName) override { return true; } const std::filesystem::path& RootDirectory; std::vector& Entries; } CasVisitor{RootDir, Entries}; FileSystemTraversal Traversal; Traversal.TraverseFileSystem(RootDir, CasVisitor); return Entries; }; class FileCasStoreCompactor : public GcStoreCompactor { public: FileCasStoreCompactor(FileCasStrategy& Owner, std::vector&& ReferencesToClean) : m_FileCasStrategy(Owner) , m_ReferencesToClean(std::move(ReferencesToClean)) { m_ReferencesToClean.shrink_to_fit(); } virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function&) { ZEN_TRACE_CPU("FileCas::CompactStore"); Stopwatch Timer; const auto _ = MakeGuard([&] { Reset(m_ReferencesToClean); if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: filecas [COMPACT] '{}': RemovedDisk: {} in {}", m_FileCasStrategy.m_RootDirectory, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); ZEN_INFO("GCV2: filecas [COMPACT] '{}': {} {} files", m_FileCasStrategy.m_RootDirectory, Ctx.Settings.IsDeleteMode ? "Removing" : "Checking", m_ReferencesToClean.size()); size_t Skipped = 0; for (const IoHash& ChunkHash : m_ReferencesToClean) { FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory.c_str(), ChunkHash); { RwLock::SharedLockScope __(m_FileCasStrategy.m_Lock); if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end()) { // Not regarded as pruned, leave it be continue; } if (Ctx.IsCancelledFlag.load()) { return; } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Deleting CAS payload file '{}'", m_FileCasStrategy.m_RootDirectory, Name.ShardedPath.ToUtf8()); } std::error_code Ec; uint64_t SizeOnDisk = std::filesystem::file_size(Name.ShardedPath.c_str(), Ec); if (Ec) { SizeOnDisk = 0; } bool Existed = std::filesystem::remove(Name.ShardedPath.c_str(), Ec); if (Ec) { ZEN_WARN("GCV2: filecas [COMPACT] '{}': Failed deleting CAS payload file '{}'. Reason '{}'", m_FileCasStrategy.m_RootDirectory, Name.ShardedPath.ToUtf8(), Ec.message()); continue; } if (!Existed) { continue; } Stats.RemovedDisk += SizeOnDisk; } else { std::error_code Ec; bool Existed = std::filesystem::is_regular_file(Name.ShardedPath.c_str(), Ec); if (Ec) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Failed checking CAS payload file '{}'. Reason '{}'", m_FileCasStrategy.m_RootDirectory, Name.ShardedPath.ToUtf8(), Ec.message()); } continue; } if (!Existed) { continue; } Skipped++; } } } if (Skipped > 0) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Skipped deleting of {} eligible files", m_FileCasStrategy.m_RootDirectory, Skipped); } } Reset(m_ReferencesToClean); } private: FileCasStrategy& m_FileCasStrategy; std::vector m_ReferencesToClean; }; class FileCasReferencePruner : public GcReferencePruner { public: FileCasReferencePruner(FileCasStrategy& Owner, std::vector&& Cids) : m_FileCasStrategy(Owner), m_Cids(std::move(Cids)) {} virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); } virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences) override { ZEN_TRACE_CPU("FileCas::RemoveUnreferencedData"); Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: filecas [PRUNE] '{}': Count: {}, Unreferenced: {}, FreedMemory: {} in {}", m_FileCasStrategy.m_RootDirectory, Stats.CheckedCount, Stats.FoundCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector UnusedCids = GetUnusedReferences(m_Cids); Stats.CheckedCount = m_Cids.size(); Stats.FoundCount = UnusedCids.size(); if (UnusedCids.empty()) { // Nothing to collect return nullptr; } if (Ctx.Settings.IsDeleteMode) { std::vector ExpiredEntries; ExpiredEntries.reserve(UnusedCids.size()); { RwLock::ExclusiveLockScope __(m_FileCasStrategy.m_Lock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } uint64_t RemovedSize = 0; for (const IoHash& ChunkHash : UnusedCids) { if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end()) { uint64_t FileSize = It->second.Size; ExpiredEntries.push_back( {.Key = ChunkHash, .Flags = FileCasStrategy::FileCasIndexEntry::kTombStone, .Size = FileSize}); RemovedSize += FileSize; } } m_FileCasStrategy.m_TotalSize.fetch_sub(RemovedSize, std::memory_order_relaxed); if (!ExpiredEntries.empty()) { for (const FileCasStrategy::FileCasIndexEntry& Entry : ExpiredEntries) { m_FileCasStrategy.m_Index.erase(Entry.Key); Stats.DeletedCount++; } m_FileCasStrategy.m_CasLog.Append(ExpiredEntries); m_FileCasStrategy.m_CasLog.Flush(); } } } return new FileCasStoreCompactor(m_FileCasStrategy, std::move(UnusedCids)); } private: FileCasStrategy& m_FileCasStrategy; std::vector m_Cids; }; std::string FileCasStrategy::GetGcName(GcCtx&) { return fmt::format("filecas: '{}'", m_RootDirectory.string()); } GcReferencePruner* FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) { ZEN_TRACE_CPU("FileCas::CreateReferencePruner"); Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: filecas [CREATE PRUNER] '{}' in {}", m_RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector CidsToCheck; { RwLock::SharedLockScope __(m_Lock); if (m_Index.empty()) { return {}; } if (Ctx.IsCancelledFlag.load()) { return nullptr; } CidsToCheck.reserve(m_Index.size()); for (const auto& It : m_Index) { CidsToCheck.push_back(It.first); } } return new FileCasReferencePruner(*this, std::move(CidsToCheck)); } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS TEST_CASE("cas.file.move") { // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; GcManager Gc; FileCasStrategy FileCas(Gc); FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; IoBuffer ZeroBytes{1024 * 1024}; IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes); BasicFile PayloadFile; PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate); PayloadFile.Write(ZeroBytes, 0); PayloadFile.Close(); IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash); CHECK_EQ(Result.New, true); } # if 0 SUBCASE("stresstest") { std::vector PayloadHashes; const int kWorkers = 64; const int kItemCount = 128; for (int w = 0; w < kWorkers; ++w) { for (int i = 0; i < kItemCount; ++i) { IoBuffer Payload{1024}; *reinterpret_cast(Payload.MutableData()) = i; PayloadHashes.push_back(IoHash::HashBuffer(Payload)); std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; WriteFile(PayloadPath, Payload); } } std::barrier Sync{kWorkers}; auto PopulateAll = [&](int w) { std::vector Buffers; for (int i = 0; i < kItemCount; ++i) { std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath); Buffers.push_back(Payload); Sync.arrive_and_wait(); CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]); } }; std::vector Threads; for (int i = 0; i < kWorkers; ++i) { Threads.push_back(std::jthread(PopulateAll, i)); } for (std::jthread& Thread : Threads) { Thread.join(); } } # endif } TEST_CASE("cas.file.gc") { // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; GcManager Gc; FileCasStrategy FileCas(Gc); FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); const int kIterationCount = 100; std::vector Keys{kIterationCount}; auto InsertChunks = [&] { for (int i = 0; i < kIterationCount; ++i) { CbObjectWriter Cbo; Cbo << "id" << i; CbObject Obj = Cbo.Save(); IoBuffer ObjBuffer = Obj.GetBuffer().AsIoBuffer(); IoHash Hash = HashBuffer(ObjBuffer); FileCas.InsertChunk(ObjBuffer, Hash); Keys[i] = Hash; } }; // Drop everything { InsertChunks(); GcContext Ctx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); FileCas.CollectGarbage(Ctx); for (const IoHash& Key : Keys) { IoBuffer Chunk = FileCas.FindChunk(Key); CHECK(!Chunk); } } // Keep roughly half of the chunks { InsertChunks(); GcContext Ctx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); for (const IoHash& Key : Keys) { if (Key.Hash[0] & 1) { Ctx.AddRetainedCids(std::vector{Key}); } } FileCas.CollectGarbage(Ctx); for (const IoHash& Key : Keys) { if (Key.Hash[0] & 1) { CHECK(FileCas.FindChunk(Key)); } else { CHECK(!FileCas.FindChunk(Key)); } } } } #endif void filecas_forcelink() { } } // namespace zen