// Copyright Epic Games, Inc. All Rights Reserved. #include "filecas.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include #endif #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #if ZEN_PLATFORM_WINDOWS # include #else # include # include # include #endif ZEN_THIRD_PARTY_INCLUDES_END #include ////////////////////////////////////////////////////////////////////////// namespace zen { const FLLMTag& GetFileCasTag() { static FLLMTag _("file", FLLMTag("cas")); return _; } namespace filecas::impl { template void Reset(T& V) { T Tmp; V.swap(Tmp); } const char* IndexExtension = ".uidx"; const char* LogExtension = ".ulog"; std::filesystem::path GetIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", IndexExtension); } std::filesystem::path GetTempIndexPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas.tmp{}", IndexExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& RootDir) { return RootDir / fmt::format("cas{}", LogExtension); } #pragma pack(push) #pragma pack(1) struct FileCasIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; static constexpr uint32_t CurrentVersion = 1; uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; uint64_t EntryCount = 0; uint64_t LogPosition = 0; uint32_t Reserved = 0; uint32_t Checksum = 0; static uint32_t ComputeChecksum(const FileCasIndexHeader& Header) { return XXH32(&Header.Magic, sizeof(FileCasIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } }; static_assert(sizeof(FileCasIndexHeader) == 32); #pragma pack(pop) } // namespace filecas::impl FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) { ShardedPath.Append(RootPath); ExtendableStringBuilder<64> HashString; ChunkHash.ToHexString(HashString); const char* str = HashString.c_str(); // Shard into a path with two directory levels containing 12 bits and 8 bits // respectively. // // This results in a maximum of 4096 * 256 directories // // The numbers have been chosen somewhat arbitrarily but are large to scale // to very large chunk repositories without creating too many directories // on a single level since NTFS does not deal very well with this. // // It may or may not make sense to make this a configurable policy, and it // would probably be a good idea to measure performance for different // policies and chunk counts ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str, str + 3); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 3, str + 5); Shard2len = ShardedPath.Size(); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 5, str + 40); } ////////////////////////////////////////////////////////////////////////// FileCasStrategy::FileCasStrategy(GcManager& Gc) : m_Log(logging::Get("filecas")), m_Gc(Gc) { static const float IndexMinLoadFactor = 0.2f; static const float IndexMaxLoadFactor = 0.7f; m_Index.min_load_factor(IndexMinLoadFactor); m_Index.max_load_factor(IndexMaxLoadFactor); m_Gc.AddGcStorage(this); m_Gc.AddGcReferenceStore(*this); } FileCasStrategy::~FileCasStrategy() { m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } void FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore) { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::Initialize"); using namespace filecas::impl; m_IsInitialized = true; m_RootDirectory = RootDirectory; m_Index.clear(); std::filesystem::path LogPath = GetLogPath(m_RootDirectory); std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); if (IsNewStore) { RemoveFile(LogPath); RemoveFile(IndexPath); if (IsDir(m_RootDirectory)) { // We need to explicitly only delete sharded root folders as the cas manifest, tinyobject and smallobject cas folders may reside // in this folder as well struct Visitor : public FileSystemTraversal::TreeVisitor { virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t, uint32_t, uint64_t) override { // We don't care about files } static bool IsHexChar(std::filesystem::path::value_type C) { return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16]; } virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t) override { if (DirectoryName.length() == 3) { if (IsHexChar(DirectoryName[0]) && IsHexChar(DirectoryName[1]) && IsHexChar(DirectoryName[2])) { ShardedRoots.push_back(Parent / DirectoryName); } } return false; } std::vector ShardedRoots; } CasVisitor; FileSystemTraversal Traversal; Traversal.TraverseFileSystem(m_RootDirectory, CasVisitor); for (const std::filesystem::path& SharededRoot : CasVisitor.ShardedRoots) { DeleteDirectories(SharededRoot); } } } if (IsFile(IndexPath)) { uint32_t IndexVersion = 0; m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); if (IndexVersion == 0) { ZEN_WARN("removing invalid index file at '{}'", IndexPath); RemoveFile(IndexPath); } } uint64_t LogEntryCount = 0; if (IsFile(LogPath)) { if (TCasLogFile::IsValid(LogPath)) { LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); } else { ZEN_WARN("removing invalid cas log at '{}'", LogPath); RemoveFile(LogPath); } } for (const auto& Entry : m_Index) { m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); } CreateDirectories(m_RootDirectory); m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); if (IsNewStore || LogEntryCount > 0) { MakeIndexSnapshot(); } } bool FileCasStrategy::UpdateIndex(const IoHash& ChunkHash, uint64_t ChunkSize) { ZEN_MEMSCOPE(GetFileCasTag()); uint64 OldChunkSize = 0; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { OldChunkSize = It->second.Size; if (OldChunkSize == ChunkSize) { return false; } } m_Index.insert_or_assign(ChunkHash, IndexEntry{.Size = ChunkSize}); } m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); m_TotalSize.fetch_sub(OldChunkSize, std::memory_order::relaxed); m_TotalSize.fetch_add(static_cast(ChunkSize), std::memory_order::relaxed); return true; } CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::InsertChunk"); ZEN_ASSERT(m_IsInitialized); #if !ZEN_WITH_TESTS ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif uint64_t ChunkSize = Chunk.GetSize(); { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { if (It->second.Size == ChunkSize) { return CasStore::InsertResult{.New = false}; } } } ShardingHelper Name(m_RootDirectory, ChunkHash); RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); if (Mode == CasStore::InsertMode::kMayBeMovedInPlace) { // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying if (MoveToFile(ChunkPath, Chunk)) { bool IsNew = UpdateIndex(ChunkHash, Chunk.Size()); return CasStore::InsertResult{.New = IsNew}; } } // Target file may be open for read, attempt to move it to a temp file and mark it as delete on close IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); if (OldChunk) { std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); std::error_code Ec; RenameFile(ChunkPath, TempPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("unable to move existing CAS file {} to {}", ChunkPath, TempPath)); } OldChunk.SetDeleteOnClose(true); if (Mode == CasStore::InsertMode::kMayBeMovedInPlace) { if (MoveToFile(ChunkPath, Chunk)) { bool IsNew = UpdateIndex(ChunkHash, Chunk.Size()); return CasStore::InsertResult{.New = IsNew}; } } } // We moved the file, make sure the index reflects that { uint64 OldChunkSize = (uint64)-1; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { OldChunkSize = It->second.Size; m_Index.erase(It); } } if (OldChunkSize != (uint64)-1) { m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = OldChunkSize}); m_TotalSize.fetch_sub(OldChunkSize, std::memory_order::relaxed); } } // We need to do a copy of the source data #if ZEN_PLATFORM_WINDOWS windows::FileHandle PayloadFile; auto InternalCreateFile = [&] { return PayloadFile.Create(ChunkPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; HRESULT hRes = InternalCreateFile(); if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) { // Ensure parent directories exist and retry file creation CreateDirectories(ChunkPath.parent_path()); hRes = InternalCreateFile(); } if (FAILED(hRes)) { ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}' for write", ChunkPath)); } auto WriteBytes = [&](windows::FileHandle& PayloadFile, const void* Cursor, uint32_t Size) { HRESULT WriteRes = PayloadFile.Write(Cursor, Size); if (FAILED(WriteRes)) { ThrowSystemException(hRes, fmt::format("failed to write {} bytes to shard file '{}'", ChunkSize, ChunkPath)); } }; #else // Attempt to exclusively create the file. auto InternalCreateFile = [&] { int Fd = open(ChunkPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); if (Fd >= 0) { fchmod(Fd, 0666); } return Fd; }; int Fd = InternalCreateFile(); int Error = (Fd < 0) ? errno : 0; if (Error == ENOENT) { CreateDirectories(ChunkPath.parent_path()); Fd = InternalCreateFile(); Error = Fd < 0 ? errno : 0; } if (Error) { ThrowSystemError(Error, fmt::format("Failed to open shard file '{}' for write", ChunkPath)); } struct FdWrapper { ~FdWrapper() { Close(); } void Write(const void* Cursor, uint32_t Size) { ssize_t Written = write(Fd, Cursor, Size); if (Written == -1) { ThrowLastError(fmt::format("failed to write {} bytes to shard file '{}'", ChunkSize, ChunkPath)); } } void Close() { if (Fd >= 0) { close(Fd); Fd = -1; } } const char* ChunkPath = nullptr; uint64 ChunkSize = 0; int Fd; } PayloadFile = {.ChunkPath = ChunkPath.c_str(), .ChunkSize = ChunkSize, .Fd = Fd}; auto WriteBytes = [](FdWrapper& PayloadFile, const void* Cursor, uint32_t Size) { PayloadFile.Write(Cursor, Size); }; #endif // ZEN_PLATFORM_WINDOWS uint64_t ChunkRemain = ChunkSize; auto ChunkCursor = reinterpret_cast(Chunk.GetData()); try { while (ChunkRemain != 0) { uint32_t ByteCount = uint32_t(std::min(4 * 1024 * 1024ull, ChunkRemain)); WriteBytes(PayloadFile, ChunkCursor, ByteCount); ChunkCursor += ByteCount; ChunkRemain -= ByteCount; } } catch (const std::exception Ex) { PayloadFile.Close(); std::error_code DummyEc; RemoveFile(ChunkPath, DummyEc); throw; } bool IsNew = UpdateIndex(ChunkHash, Chunk.Size()); return CasStore::InsertResult{.New = IsNew}; } IoBuffer FileCasStrategy::SafeOpenChunk(const IoHash& ChunkHash, uint64 ExpectedSize) { ShardingHelper Name(m_RootDirectory, ChunkHash); const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash)); if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk) { uint64 ChunkSize = Chunk.GetSize(); if (ChunkSize == ExpectedSize) { return Chunk; } IoHash RawHash; uint64 RawSize; if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); Compressed) { if (Compressed.GetCompressedSize() == ChunkSize) { // The index is wrong due to a previous problem where we failed to delete a payload due to other process locking it. // The commit that adds this recovery also tightens up the problem where we didn't update the new filesize in the index // when overwriting a payload with same RawHash but a different size (we failed to properly update the index). // This can happen when the compression algorithm changes/is updated UpdateIndex(ChunkHash, ChunkSize); ZEN_WARN("FileCas recovered file {} with size {}, expected size {}", ChunkPath, ChunkSize, ExpectedSize); return Chunk; } else { ZEN_WARN("FileCas file {} compressed size {} does not match size on disk {}", ChunkPath, Compressed.GetCompressedSize(), ChunkSize); } } else { ZEN_WARN("FileCas file {} does not have a valid compressed buffer header", ChunkPath); } { std::error_code Ec; std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); RenameFile(ChunkPath, TempPath, Ec); if (!Ec) { Chunk.SetDeleteOnClose(true); ZEN_INFO("FileCas deleted malformed file {}", ChunkPath); } } // Remove the offending file from the index so we don't try to read from it again bool WasRemoved = false; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end() && It->second.Size == ExpectedSize) { m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); m_Index.erase(It); WasRemoved = true; } } if (WasRemoved) { m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = ChunkSize}); } } else { ZEN_WARN("FileCasStrategy::IterateChunks failed to get payload for '{}'", ChunkPath); } return {}; } IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("FileCas::FindChunk"); ZEN_ASSERT(m_IsInitialized); uint64_t ExpectedSize = 0; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { ExpectedSize = It->second.Size; } else { return {}; } } return SafeOpenChunk(ChunkHash, ExpectedSize); } bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { ZEN_ASSERT(m_IsInitialized); RwLock::SharedLockScope _(m_Lock); return m_Index.contains(ChunkHash); } void FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { ZEN_TRACE_CPU("FileCas::DeleteChunk"); ShardingHelper Name(m_RootDirectory, ChunkHash); const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); uint64_t FileSize = static_cast(FileSizeFromPath(ChunkPath, Ec)); if (Ec) { ZEN_WARN("get file size FAILED, file cas '{}'", ChunkPath); FileSize = 0; } ZEN_DEBUG("deleting CAS payload file '{}' {}", ChunkPath, NiceBytes(FileSize)); RemoveFile(ChunkPath, Ec); if (!Ec || !IsFile(ChunkPath)) { { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); m_Index.erase(It); } } m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); } } void FileCasStrategy::FilterChunks(HashKeySet& InOutChunks) { ZEN_ASSERT(m_IsInitialized); // NOTE: it's not a problem now, but in the future if a GC should happen while this // is in flight, the result could be wrong since chunks could go away in the meantime. // // It would be good to have a pinning mechanism to make this less likely but // given that chunks could go away at any point after the results are returned to // a caller, this is something which needs to be taken into account by anyone consuming // this functionality in any case InOutChunks.RemoveHashesIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } bool FileCasStrategy::IterateChunks(std::span ChunkHashes, const std::function& AsyncCallback, WorkerThreadPool* OptionalWorkerPool) { std::vector FoundChunkIndexes; std::vector FoundChunkExpectedSizes; { RwLock::SharedLockScope _(m_Lock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) { if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end()) { FoundChunkIndexes.push_back(ChunkIndex); FoundChunkExpectedSizes.push_back(KeyIt->second.Size); } } } std::atomic_bool Continue = true; if (!FoundChunkIndexes.empty()) { auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); if (!AsyncCallback(ChunkIndex, std::move(Payload))) { return false; } return true; }; Latch WorkLatch(1); for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { size_t ChunkIndex = FoundChunkIndexes[Index]; uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; if (!Continue) { break; } if (OptionalWorkerPool) { WorkLatch.AddCount(1); OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); if (!Continue) { return; } try { if (!ProcessOne(ChunkIndex, ExpectedSize)) { Continue = false; } } catch (const std::exception& Ex) { ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", m_RootDirectory, ChunkHashes[ChunkIndex], Ex.what()); } }); } else { Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize); } } WorkLatch.CountDown(); WorkLatch.Wait(); } return Continue; } void FileCasStrategy::IterateChunks(std::function&& Callback) { ZEN_TRACE_CPU("FileCas::IterateChunks"); ZEN_ASSERT(m_IsInitialized); std::vector RawHashes; std::vector ExpectedSizes; { RwLock::SharedLockScope _(m_Lock); RawHashes.reserve(m_Index.size()); ExpectedSizes.reserve(m_Index.size()); for (const auto& It : m_Index) { RawHashes.push_back(It.first); ExpectedSizes.push_back(It.second.Size); } } for (size_t Index = 0; Index < RawHashes.size(); Index++) { const IoHash& ChunkHash = RawHashes[Index]; const uint64_t ExpectedSize = ExpectedSizes[Index]; IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); Callback(ChunkHash, std::move(Payload)); } } void FileCasStrategy::Flush() { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::Flush"); m_CasLog.Flush(); MakeIndexSnapshot(); } void FileCasStrategy::ScrubStorage(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::ScrubStorage"); if (Ctx.IsSkipCas()) { ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory); return; } Stopwatch Timer; ZEN_INFO("scrubbing file CAS @ '{}'", m_RootDirectory); ZEN_ASSERT(m_IsInitialized); std::vector BadHashes; uint64_t ChunkCount{0}, ChunkBytes{0}; int DiscoveredFilesNotInIndex = 0; { std::vector ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); RwLock::ExclusiveLockScope _(m_Lock); for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) { if (m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}).second) { m_TotalSize.fetch_add(static_cast(Entry.Size), std::memory_order::relaxed); m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); ++DiscoveredFilesNotInIndex; } } } ZEN_INFO("discovered {} files @ '{}' ({} not in index), scrubbing", m_Index.size(), m_RootDirectory, DiscoveredFilesNotInIndex); IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { if (!Payload) { BadHashes.push_back(Hash); return; } ++ChunkCount; ChunkBytes += Payload.GetSize(); IoBuffer InMemoryBuffer = IoBufferBuilder::ReadFromFileMaybe(Payload); IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(Payload, /* out */ RawHash, /* out */ RawSize)) { if (RawHash == Hash) { // Header hash matches the file name, full validation requires that // we check that the decompressed data hash also matches CompressedBuffer CompBuffer = CompressedBuffer::FromCompressedNoValidate(std::move(InMemoryBuffer)); OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize; if (CompBuffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { if (BlockSize == 0) { BlockSize = 256 * 1024; } else if (BlockSize < (1024 * 1024)) { BlockSize = BlockSize * (1024 * 1024 / BlockSize); } std::unique_ptr DecompressionBuffer(new uint8_t[BlockSize]); IoHashStream Hasher; uint64_t RawOffset = 0; while (RawSize) { const uint64_t DecompressedBlockSize = Min(BlockSize, RawSize); bool Ok = CompBuffer.TryDecompressTo(MutableMemoryView((void*)DecompressionBuffer.get(), DecompressedBlockSize), RawOffset); if (Ok) { Hasher.Append(DecompressionBuffer.get(), DecompressedBlockSize); } RawSize -= DecompressedBlockSize; RawOffset += DecompressedBlockSize; } const IoHash FinalHash = Hasher.GetHash(); if (FinalHash == Hash) { // all good return; } } } } BadHashes.push_back(Hash); }); Ctx.ReportScrubbed(ChunkCount, ChunkBytes); if (!BadHashes.empty()) { ZEN_WARN("file CAS scrubbing: {} bad chunks found @ '{}'", BadHashes.size(), m_RootDirectory); if (Ctx.RunRecovery()) { ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); for (const IoHash& Hash : BadHashes) { std::error_code Ec; DeleteChunk(Hash, Ec); if (Ec) { ZEN_WARN("failed to delete file for chunk {}: {}", Hash, Ec.message()); } } } else { ZEN_WARN("recovery: NOT deleting backing files for {} bad chunks", BadHashes.size()); } } // Let whomever it concerns know about the bad chunks. This could // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do Ctx.ReportBadCidChunks(BadHashes); ZEN_INFO("file CAS @ '{}' scrubbed: {} chunks ({}), took {}", m_RootDirectory, ChunkCount, NiceBytes(ChunkBytes), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } GcStorageSize FileCasStrategy::StorageSize() const { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } bool FileCasStrategy::ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) { OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString()); return false; } if (Entry.Flags & (~FileCasIndexEntry::kTombStone)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Flags, Entry.Key.ToHexString()); return false; } if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) { return true; } uint64_t Size = Entry.Size; if (Size == 0) { OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString()); return false; } return true; } void FileCasStrategy::MakeIndexSnapshot() { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::MakeIndexSnapshot"); using namespace filecas::impl; uint64_t LogCount = m_CasLog.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("write store snapshot for '{}'", m_RootDirectory); uint64_t EntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_RootDirectory, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); namespace fs = std::filesystem; fs::path IndexPath = GetIndexPath(m_RootDirectory); fs::path STmpIndexPath = GetTempIndexPath(m_RootDirectory); // Move index away, we keep it if something goes wrong if (IsFile(STmpIndexPath)) { std::error_code Ec; if (!RemoveFile(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); return; } } try { if (IsFile(IndexPath)) { RenameFile(IndexPath, STmpIndexPath); } // Write the current state of the location map to a new index state std::vector Entries; uint64_t IndexLogPosition = 0; { RwLock::SharedLockScope __(m_Lock); IndexLogPosition = m_CasLog.GetLogCount(); Entries.resize(m_Index.size()); uint64_t EntryIndex = 0; for (const auto& Entry : m_Index) { FileCasIndexEntry& IndexEntry = Entries[EntryIndex++]; IndexEntry.Key = Entry.first; IndexEntry.Size = Entry.second.Size; } } TemporaryFile ObjectIndexFile; std::error_code Ec; ObjectIndexFile.CreateTemporary(IndexPath.parent_path(), Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to create temp file for index snapshot at '{}'", IndexPath)); } filecas::impl::FileCasIndexHeader Header = {.EntryCount = Entries.size(), .LogPosition = IndexLogPosition}; Header.Checksum = filecas::impl::FileCasIndexHeader::ComputeChecksum(Header); ObjectIndexFile.Write(&Header, sizeof(filecas::impl::FileCasIndexHeader), 0); ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(FileCasIndexEntry), sizeof(filecas::impl::FileCasIndexHeader)); ObjectIndexFile.Flush(); ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath)); } EntryCount = Entries.size(); m_LogFlushPosition = IndexLogPosition; } catch (const std::exception& Err) { ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); // Restore any previous snapshot if (IsFile(STmpIndexPath)) { std::error_code Ec; RemoveFile(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless RenameFile(STmpIndexPath, IndexPath, Ec); if (Ec) { ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); } } } if (IsFile(STmpIndexPath)) { std::error_code Ec; if (!RemoveFile(STmpIndexPath, Ec) || Ec) { ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); } } } uint64_t FileCasStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { ZEN_TRACE_CPU("FileCas::ReadIndexFile"); using namespace filecas::impl; std::vector Entries; if (IsFile(IndexPath)) { Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); uint64_t Size = ObjectIndexFile.FileSize(); if (Size >= sizeof(FileCasIndexHeader)) { uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(FileCasIndexHeader))) / sizeof(FileCasIndexEntry); FileCasIndexHeader Header; ObjectIndexFile.Read(&Header, sizeof(Header), 0); if ((Header.Magic == FileCasIndexHeader::ExpectedMagic) && (Header.Version == FileCasIndexHeader::CurrentVersion) && (Header.Checksum == FileCasIndexHeader::ComputeChecksum(Header)) && (Header.EntryCount <= ExpectedEntryCount)) { Entries.resize(Header.EntryCount); ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(FileCasIndexEntry), sizeof(FileCasIndexHeader)); std::string InvalidEntryReason; for (const FileCasIndexEntry& Entry : Entries) { if (!ValidateEntry(Entry, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); continue; } m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); } OutVersion = FileCasIndexHeader::CurrentVersion; return Header.LogPosition; } else { ZEN_WARN("skipping invalid index file '{}'", IndexPath); } } return 0; } if (IsDir(m_RootDirectory)) { ZEN_INFO("missing index for file cas, scanning for cas files in {}", m_RootDirectory); TCasLogFile CasLog; uint64_t TotalSize = 0; Stopwatch TotalTimer; const auto _ = MakeGuard([&] { ZEN_INFO("scanned file cas folder '{}' DONE after {}, found {} files totalling {}", m_RootDirectory, NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), CasLog.GetLogCount(), NiceBytes(TotalSize)); }); std::filesystem::path LogPath = GetLogPath(m_RootDirectory); std::vector ScannedEntries = FileCasStrategy::ScanFolderForCasFiles(m_RootDirectory); CasLog.Open(LogPath, CasLogFile::Mode::kTruncate); std::string InvalidEntryReason; for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) { m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); TotalSize += Entry.Size; CasLog.Append(Entry); } CasLog.Close(); } return 0; } uint64_t FileCasStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { ZEN_TRACE_CPU("FileCas::ReadLog"); using namespace filecas::impl; if (IsFile(LogPath)) { uint64_t LogEntryCount = 0; Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); TCasLogFile CasLog; CasLog.Open(LogPath, CasLogFile::Mode::kRead); if (CasLog.Initialize()) { uint64_t EntryCount = CasLog.GetLogCount(); if (EntryCount < SkipEntryCount) { ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } LogEntryCount = EntryCount - SkipEntryCount; m_Index.reserve(LogEntryCount); uint64_t InvalidEntryCount = 0; CasLog.Replay( [&](const FileCasIndexEntry& Record) { std::string InvalidEntryReason; if (Record.Flags & FileCasIndexEntry::kTombStone) { m_Index.erase(Record.Key); return; } if (!ValidateEntry(Record, InvalidEntryReason)) { ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); ++InvalidEntryCount; return; } m_Index.insert_or_assign(Record.Key, IndexEntry{.Size = Record.Size}); }, SkipEntryCount); if (InvalidEntryCount) { ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, LogPath); } return LogEntryCount; } } return 0; } std::vector FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) { ZEN_TRACE_CPU("FileCas::ScanFolderForCasFiles"); using namespace filecas::impl; std::vector Entries; struct Visitor : public FileSystemTraversal::TreeVisitor { Visitor(const std::filesystem::path& RootDir, std::vector& Entries) : RootDirectory(RootDir), Entries(Entries) {} virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override { std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); std::filesystem::path::string_type PathString = RelPath.native(); if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) { if (PathString.at(3) == std::filesystem::path::preferred_separator) { PathString.erase(3, 1); } PathString.append(File); // TODO: should validate that we're actually dealing with a valid hex string here #if ZEN_PLATFORM_WINDOWS StringBuilder<64> Utf8; WideToUtf8(PathString, Utf8); IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); #else IoHash NameHash = IoHash::FromHexString(PathString); #endif Entries.emplace_back(FileCasIndexEntry{.Key = NameHash, .Size = FileSize}); } } virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; } const std::filesystem::path& RootDirectory; std::vector& Entries; } CasVisitor{RootDir, Entries}; FileSystemTraversal Traversal; Traversal.TraverseFileSystem(RootDir, CasVisitor); return Entries; }; class FileCasStoreCompactor : public GcStoreCompactor { public: FileCasStoreCompactor(FileCasStrategy& Owner, std::vector&& ReferencesToClean) : m_FileCasStrategy(Owner) , m_ReferencesToClean(std::move(ReferencesToClean)) { m_ReferencesToClean.shrink_to_fit(); } virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function&) override { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::CompactStore"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { filecas::impl::Reset(m_ReferencesToClean); if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: filecas [COMPACT] '{}': RemovedDisk: {} in {}", m_FileCasStrategy.m_RootDirectory, NiceBytes(Stats.RemovedDisk), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); ZEN_INFO("GCV2: filecas [COMPACT] '{}': {} {} files", m_FileCasStrategy.m_RootDirectory, Ctx.Settings.IsDeleteMode ? "Removing" : "Checking", m_ReferencesToClean.size()); size_t Skipped = 0; for (const IoHash& ChunkHash : m_ReferencesToClean) { FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory, ChunkHash); const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); { RwLock::ExclusiveLockScope HashLock(m_FileCasStrategy.LockForHash(ChunkHash)); RwLock::SharedLockScope __(m_FileCasStrategy.m_Lock); if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end()) { // Not regarded as pruned, leave it be continue; } if (Ctx.IsCancelledFlag.load()) { return; } if (Ctx.Settings.IsDeleteMode) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Deleting CAS payload file '{}'", m_FileCasStrategy.m_RootDirectory, ChunkPath); } std::error_code Ec; uint64_t SizeOnDisk = FileSizeFromPath(ChunkPath, Ec); if (Ec) { SizeOnDisk = 0; } bool Existed = RemoveFile(ChunkPath, Ec); if (Ec) { // Target file may be open for read, attempt to move it to a temp file and mark it delete on close IoBuffer OldChunk; { OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); } if (OldChunk) { std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); RenameFile(ChunkPath, TempPath, Ec); if (!Ec) { OldChunk.SetDeleteOnClose(true); Existed = true; } } } if (Ec) { ZEN_WARN("GCV2: filecas [COMPACT] '{}': Failed deleting CAS payload file '{}'. Reason '{}'", m_FileCasStrategy.m_RootDirectory, ChunkPath, Ec.message()); continue; } if (!Existed) { continue; } Stats.RemovedDisk += SizeOnDisk; } else { std::error_code Ec; bool Existed = IsFile(ChunkPath, Ec); if (Ec) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Failed checking CAS payload file '{}'. Reason '{}'", m_FileCasStrategy.m_RootDirectory, ChunkPath, Ec.message()); } continue; } if (!Existed) { continue; } Skipped++; } } } if (Skipped > 0) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Skipped deleting of {} eligible files", m_FileCasStrategy.m_RootDirectory, Skipped); } } filecas::impl::Reset(m_ReferencesToClean); } virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); } private: FileCasStrategy& m_FileCasStrategy; std::vector m_ReferencesToClean; }; class FileCasReferencePruner : public GcReferencePruner { public: FileCasReferencePruner(FileCasStrategy& Owner, std::vector&& Cids) : m_FileCasStrategy(Owner), m_Cids(std::move(Cids)) {} virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); } virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences) override { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::RemoveUnreferencedData"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: filecas [PRUNE] '{}': Count: {}, Unreferenced: {}, FreedMemory: {} in {}", m_FileCasStrategy.m_RootDirectory, Stats.CheckedCount, Stats.FoundCount, NiceBytes(Stats.FreedMemory), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::span UnusedCids = GetUnusedReferences(m_Cids); Stats.CheckedCount = m_Cids.size(); Stats.FoundCount = UnusedCids.size(); if (UnusedCids.empty()) { // Nothing to collect return nullptr; } if (Ctx.Settings.IsDeleteMode) { std::vector ExpiredEntries; ExpiredEntries.reserve(UnusedCids.size()); { RwLock::ExclusiveLockScope __(m_FileCasStrategy.m_Lock); if (Ctx.IsCancelledFlag.load()) { return nullptr; } uint64_t RemovedSize = 0; for (const IoHash& ChunkHash : UnusedCids) { if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end()) { uint64_t FileSize = It->second.Size; ExpiredEntries.push_back( {.Key = ChunkHash, .Flags = FileCasStrategy::FileCasIndexEntry::kTombStone, .Size = FileSize}); RemovedSize += FileSize; } } m_FileCasStrategy.m_TotalSize.fetch_sub(RemovedSize, std::memory_order_relaxed); if (!ExpiredEntries.empty()) { for (const FileCasStrategy::FileCasIndexEntry& Entry : ExpiredEntries) { m_FileCasStrategy.m_Index.erase(Entry.Key); Stats.DeletedCount++; } m_FileCasStrategy.m_CasLog.Append(ExpiredEntries); m_FileCasStrategy.m_CasLog.Flush(); } } } return new FileCasStoreCompactor(m_FileCasStrategy, std::vector(UnusedCids.begin(), UnusedCids.end())); } private: FileCasStrategy& m_FileCasStrategy; std::vector m_Cids; }; std::string FileCasStrategy::GetGcName(GcCtx&) { return fmt::format("filecas: '{}'", m_RootDirectory.string()); } GcReferencePruner* FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) { ZEN_MEMSCOPE(GetFileCasTag()); ZEN_TRACE_CPU("FileCas::CreateReferencePruner"); auto Log = [&Ctx]() { return Ctx.Logger; }; Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } ZEN_INFO("GCV2: filecas [CREATE PRUNER] '{}' in {}", m_RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); std::vector CidsToCheck; { RwLock::SharedLockScope __(m_Lock); if (m_Index.empty()) { return {}; } if (Ctx.IsCancelledFlag.load()) { return nullptr; } CidsToCheck.reserve(m_Index.size()); for (const auto& It : m_Index) { CidsToCheck.push_back(It.first); } } if (FilterReferences(Ctx, fmt::format("filecas [CREATE PRUNER] '{}'", m_RootDirectory), CidsToCheck)) { return new FileCasReferencePruner(*this, std::move(CidsToCheck)); } return nullptr; } ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS TEST_CASE("cas.chunk.mismatch") { } TEST_CASE("cas.chunk.moveoverwrite") { ScopedTemporaryDirectory TempDir; GcManager Gc; FileCasStrategy FileCas(Gc); FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; IoBuffer PayloadBlob(1024 * 1024); memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize()); CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob)); { WriteFile(Payload1Path, CompressedPayload1.GetCompressed()); IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); Payload1.SetDeleteOnClose(true); CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash()); CHECK_EQ(Result.New, true); CHECK(!IsFile(Payload1Path)); } { std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"}; WriteFile(Payload1BPath, CompressedPayload1.GetCompressed()); IoBuffer Payload1B = IoBufferBuilder::MakeFromTemporaryFile(Payload1BPath); Payload1B.SetDeleteOnClose(true); CasStore::InsertResult Result = FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash()); CHECK_EQ(Result.New, false); CHECK(IsFile(Payload1BPath)); Payload1B = {}; CHECK(!IsFile(Payload1BPath)); } IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); CHECK(FetchedPayload); CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize()); { IoHash RawHash; uint64_t RawSize; CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); CHECK(ValidatePayload1); } CompressedBuffer CompressedPayload2 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None); std::filesystem::path Payload2Path{TempDir.Path() / "payload_2"}; WriteFile(Payload2Path, CompressedPayload2.GetCompressed()); IoBuffer Payload2 = IoBufferBuilder::MakeFromTemporaryFile(Payload2Path); Payload2.SetDeleteOnClose(true); { CasStore::InsertResult Result = FileCas.InsertChunk(Payload2, CompressedPayload2.DecodeRawHash()); CHECK_EQ(Result.New, true); } Payload2 = {}; CHECK(!IsFile(Payload2Path)); { IoHash RawHash; uint64_t RawSize; CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); CHECK(ValidatePayload1); } IoBuffer FetchedPayload2 = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); CHECK(FetchedPayload2); CHECK(FetchedPayload2.GetSize() == CompressedPayload2.GetCompressedSize()); { IoHash RawHash; uint64_t RawSize; CompressedBuffer ValidatePayload2 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload2), RawHash, RawSize); CHECK(ValidatePayload2); } } } TEST_CASE("cas.chunk.copyoverwrite") { ScopedTemporaryDirectory TempDir; GcManager Gc; FileCasStrategy FileCas(Gc); FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; IoBuffer PayloadBlob(1024 * 1024); memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize()); CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob)); { WriteFile(Payload1Path, CompressedPayload1.GetCompressed()); IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); Payload1.SetDeleteOnClose(true); CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); CHECK_EQ(Result.New, true); CHECK(IsFile(Payload1Path)); Payload1 = {}; CHECK(!IsFile(Payload1Path)); } { std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"}; WriteFile(Payload1BPath, CompressedPayload1.GetCompressed()); IoBuffer Payload1B = IoBufferBuilder::MakeFromTemporaryFile(Payload1BPath); Payload1B.SetDeleteOnClose(true); CasStore::InsertResult Result = FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); CHECK_EQ(Result.New, false); CHECK(IsFile(Payload1BPath)); Payload1B = {}; CHECK(!IsFile(Payload1BPath)); } IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); CHECK(FetchedPayload); CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize()); { IoHash RawHash; uint64_t RawSize; CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); CHECK(ValidatePayload1); } CompressedBuffer CompressedPayload2 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None); std::filesystem::path Payload2Path{TempDir.Path() / "payload_2"}; WriteFile(Payload2Path, CompressedPayload2.GetCompressed()); IoBuffer Payload2 = IoBufferBuilder::MakeFromTemporaryFile(Payload2Path); Payload2.SetDeleteOnClose(true); { CasStore::InsertResult Result = FileCas.InsertChunk(Payload2, CompressedPayload2.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); CHECK_EQ(Result.New, true); } Payload2 = {}; CHECK(!IsFile(Payload2Path)); { IoHash RawHash; uint64_t RawSize; CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); CHECK(ValidatePayload1); } IoBuffer FetchedPayload2 = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); CHECK(FetchedPayload2); CHECK(FetchedPayload2.GetSize() == CompressedPayload2.GetCompressedSize()); { IoHash RawHash; uint64_t RawSize; CompressedBuffer ValidatePayload2 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload2), RawHash, RawSize); CHECK(ValidatePayload2); } } } TEST_CASE("cas.chunk.recoverbadsize") { ScopedTemporaryDirectory TempDir; GcManager Gc; FileCasStrategy FileCas(Gc); FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; IoBuffer PayloadBlob(1024 * 1024); memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize()); CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob)); CompressedBuffer CompressedPayload2 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None); CasStore::InsertResult Result = FileCas.InsertChunk(CompressedPayload1.GetCompressed().Flatten().AsIoBuffer(), CompressedPayload1.DecodeRawHash()); CHECK_EQ(Result.New, true); IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); CHECK(FetchedPayload); CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize()); IoBufferFileReference Ref{nullptr, 0, 0}; CHECK(FetchedPayload.GetFileReference(Ref)); std::error_code Ec; std::filesystem::path CASPath = PathFromHandle(Ref.FileHandle, Ec); CHECK(!Ec); FetchedPayload = {}; WriteFile(CASPath, CompressedPayload2.GetCompressed()); FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); CHECK(FetchedPayload); CHECK(FetchedPayload.GetSize() == CompressedPayload2.GetCompressedSize()); FetchedPayload = {}; WriteFile(CASPath, IoBuffer(1024)); CHECK(FileCas.HaveChunk(CompressedPayload1.DecodeRawHash())); FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); CHECK(!FetchedPayload); CHECK(!FileCas.HaveChunk(CompressedPayload1.DecodeRawHash())); } } TEST_CASE("cas.file.move") { // specifying an absolute path here can be helpful when using procmon to dig into things ScopedTemporaryDirectory TempDir; // {"d:\\filecas_testdir"}; GcManager Gc; FileCasStrategy FileCas(Gc); FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); { std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; IoBuffer ZeroBytes{1024 * 1024}; IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes); BasicFile PayloadFile; PayloadFile.Open(Payload1Path, BasicFile::Mode::kTruncate); PayloadFile.Write(ZeroBytes, 0); PayloadFile.Close(); IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash); CHECK_EQ(Result.New, true); } # if 0 SUBCASE("stresstest") { std::vector PayloadHashes; const int kWorkers = 64; const int kItemCount = 128; for (int w = 0; w < kWorkers; ++w) { for (int i = 0; i < kItemCount; ++i) { IoBuffer Payload{1024}; *reinterpret_cast(Payload.MutableData()) = i; PayloadHashes.push_back(IoHash::HashBuffer(Payload)); std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; WriteFile(PayloadPath, Payload); } } std::barrier Sync{kWorkers}; auto PopulateAll = [&](int w) { std::vector Buffers; for (int i = 0; i < kItemCount; ++i) { std::filesystem::path PayloadPath{TempDir.Path() / fmt::format("payload_{}_{}", w, i)}; IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath); Buffers.push_back(Payload); Sync.arrive_and_wait(); CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]); } }; std::vector Threads; for (int i = 0; i < kWorkers; ++i) { Threads.push_back(std::jthread(PopulateAll, i)); } for (std::jthread& Thread : Threads) { Thread.join(); } } # endif } #endif void filecas_forcelink() { } } // namespace zen