diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-22 10:58:20 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-22 10:58:20 +0100 |
| commit | 36658631e3f29b27d08b36fa6cf143d6902b7789 (patch) | |
| tree | 8c618ec8081a40a479dc98dfd91b96b8b1e21e9c /src | |
| parent | fixed off-by-one in GetPidStatus (Linux) which might cause spurious errors (#... (diff) | |
| download | zen-36658631e3f29b27d08b36fa6cf143d6902b7789.tar.xz zen-36658631e3f29b27d08b36fa6cf143d6902b7789.zip | |
fix inconsistencies in filecas due to failing to remove payload file during GC (#224)
make sure we rewrite filecas entries if chunk size changes (due to compression changes)
hardening of move/write files in filecas
if we encounter a filecas entry with mismatching size (due to pre-existing bug) we validate the file and update the index
if we find a bad filecas file on disk we now attempt to remove it
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/filesystem.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 857 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 4 |
4 files changed, 438 insertions, 429 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 93383a656..9ca5f1131 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -902,14 +902,14 @@ MoveToFile(std::filesystem::path Path, IoBuffer Data) { return false; } - int Ret = link(SourcePath.c_str(), Path.c_str()); + int Ret = rename(SourcePath.c_str(), Path.c_str()); if (Ret < 0) { int32_t err = errno; if (err == ENOENT) { zen::CreateDirectories(Path.parent_path()); - Ret = link(SourcePath.c_str(), Path.c_str()); + Ret = rename(SourcePath.c_str(), Path.c_str()); } } if (Ret < 0) diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 7f1300177..b3309f7a7 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -257,7 +257,7 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) { - m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size()); + m_LocationMap.insert_or_assign(DiskIndexEntry.Key, m_Locations.size()); m_Locations.push_back(DiskIndexEntry.Location); } } diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 6d5bcff96..2031804c9 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -96,7 +96,7 @@ namespace filecas::impl { FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) { - ShardedPath.Append(RootPath.c_str()); + ShardedPath.Append(RootPath); ExtendableStringBuilder<64> HashString; ChunkHash.ToHexString(HashString); @@ -245,6 +245,28 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN } } +bool +FileCasStrategy::UpdateIndex(const IoHash& ChunkHash, uint64_t ChunkSize) +{ + uint64 OldChunkSize = 0; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) + { + OldChunkSize = It->second.Size; + if (OldChunkSize == ChunkSize) + { + return false; + } + } + m_Index.insert_or_assign(ChunkHash, IndexEntry{.Size = ChunkSize}); + } + m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); + m_TotalSize.fetch_sub(OldChunkSize, std::memory_order::relaxed); + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + return true; +} + CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { @@ -256,437 +278,133 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif - if (Mode == CasStore::InsertMode::kCopyOnly) + uint64_t ChunkSize = Chunk.GetSize(); { + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { - RwLock::SharedLockScope _(m_Lock); - if (m_Index.contains(ChunkHash)) + if (It->second.Size == ChunkSize) { return CasStore::InsertResult{.New = false}; } } - return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash); } - // File-based chunks have special case handling whereby we move the file into - // place in the file store directory, thus avoiding unnecessary copying - - IoBufferFileReference FileRef; - bool IsWholeFile = Chunk.IsWholeFile(); - if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef)) - { - ZEN_TRACE_CPU("FileCas::InsertChunk::Move"); - - { - bool Exists = true; - { - RwLock::SharedLockScope _(m_Lock); - Exists = m_Index.contains(ChunkHash); - } - if (Exists) - { - return CasStore::InsertResult{.New = false}; - } - } - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); - -#if ZEN_PLATFORM_WINDOWS - const HANDLE ChunkFileHandle = FileRef.FileHandle; - // See if file already exists - { - ZEN_TRACE_CPU("FileCas::InsertChunk::Exists"); - - windows::FileHandle PayloadFile; - - if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) - { - // If we succeeded in opening the target file then we don't need to do anything else because it already exists - // and should contain the content we were about to insert - - // We do need to ensure the source file goes away on close, however - size_t ChunkSize = Chunk.GetSize(); - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) - { - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - - return CasStore::InsertResult{.New = IsNew}; - } - else - { - ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", - Name.ShardedPath.ToUtf8(), - ChunkSize, - FileSize); - } - } - else - { - if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) - { - // Shard directory does not exist - } - else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) - { - // Shard directory exists, but not the file - } - else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION)) - { - // Sharing violation, likely because we are trying to open a file - // which has been renamed on another thread, and the file handle - // used to rename it is still open. We handle this case below - // instead of here - } - else - { - ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes); - } - } - } - - std::filesystem::path FullPath(Name.ShardedPath.c_str()); - - std::filesystem::path FilePath = FullPath.parent_path(); - std::wstring FileName = FullPath.native(); - - const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR)); - FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize)); - memset(RenameInfo, 0, BufferSize); - - RenameInfo->ReplaceIfExists = FALSE; - RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size()); - memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); - RenameInfo->FileName[FileName.size()] = 0; - - auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); - - // Try to move file into place - BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); - - if (!Success) - { - // The rename/move could fail because the target directory does not yet exist. This code attempts - // to create it - - windows::FileHandle DirHandle; - - auto InternalCreateDirectoryHandle = [&] { - return DirHandle.Create(FilePath.c_str(), - GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - OPEN_EXISTING, - FILE_FLAG_BACKUP_SEMANTICS); - }; - - // It's possible for several threads to enter this logic trying to create the same - // directory. Only one will create the directory of course, but all threads will - // make it through okay - - HRESULT hRes = InternalCreateDirectoryHandle(); - - if (FAILED(hRes)) - { - // TODO: we can handle directory creation more intelligently and efficiently than - // this currently does - - CreateDirectories(FilePath.c_str()); - - hRes = InternalCreateDirectoryHandle(); - } - - if (FAILED(hRes)) - { - ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath)); - } - - // Retry rename/move - - Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); - } - - if (Success) - { - m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); - Chunk.SetDeleteOnClose(false); + ShardingHelper Name(m_RootDirectory, ChunkHash); + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); - HashLock.ReleaseNow(); + const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - - const DWORD LastError = GetLastError(); + if (Mode == CasStore::InsertMode::kMayBeMovedInPlace) + { + // File-based chunks have special case handling whereby we move the file into + // place in the file store directory, thus avoiding unnecessary copying - if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) + if (MoveToFile(ChunkPath, Chunk)) { - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - + bool IsNew = UpdateIndex(ChunkHash, Chunk.Size()); return CasStore::InsertResult{.New = IsNew}; } - ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", - GetSystemErrorAsString(LastError), - ChunkHash); + } -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + // Target file may be open for read, attempt to move it to a temp file and mark it as delete on close + IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); + if (OldChunk) + { + std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); std::error_code Ec; - std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle, Ec); + std::filesystem::rename(ChunkPath, TempPath, Ec); if (Ec) { - ZEN_WARN("link of CAS payload file {} failed ('{}'), falling back to regular write for insert of {}", - FileRef.FileHandle, - Ec.message(), - ChunkHash); + throw std::system_error(Ec, fmt::format("unable to move existing CAS file {} to {}", ChunkPath, TempPath)); } - else + OldChunk.SetDeleteOnClose(true); + + if (Mode == CasStore::InsertMode::kMayBeMovedInPlace) { - std::filesystem::path DestPath = Name.ShardedPath.c_str(); - int Ret = link(SourcePath.c_str(), DestPath.c_str()); - if (Ret < 0 && zen::GetLastError() == ENOENT) - { - // Destination directory doesn't exist. Create it any try again. - CreateDirectories(DestPath.parent_path().c_str()); - Ret = link(SourcePath.c_str(), DestPath.c_str()); - } - if (Ret == 0) + if (MoveToFile(ChunkPath, Chunk)) { - m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); - Chunk.SetDeleteOnClose(false); - - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } + bool IsNew = UpdateIndex(ChunkHash, Chunk.Size()); return CasStore::InsertResult{.New = IsNew}; } - else - { - int LinkError = zen::GetLastError(); - - // It is possible that someone beat us to it in linking the file. In that - // case a "file exists" error is okay. All others are not. - if (LinkError == EEXIST) - { - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - - ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", - GetSystemErrorAsString(LinkError), - ChunkHash); - } } -#endif // ZEN_PLATFORM_* } - return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash); -} - -CasStore::InsertResult -FileCasStrategy::InsertChunkData(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) -{ - ZEN_TRACE_CPU("FileCas::InsertChunkData"); - - ZEN_ASSERT(m_IsInitialized); - + // We moved the file, make sure the index reflects that { - RwLock::SharedLockScope _(m_Lock); - if (m_Index.contains(ChunkHash)) - { - return {.New = false}; - } - } - - ZEN_TRACE_CPU("FileCas::InsertChunkData::Write"); - - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - // See if file already exists - -#if ZEN_PLATFORM_WINDOWS - windows::FileHandle PayloadFile; - - HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - - if (SUCCEEDED(hRes)) - { - // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the - // content we were about to insert - - bool IsNew = false; + uint64 OldChunkSize = (uint64)-1; { RwLock::ExclusiveLockScope _(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } - - PayloadFile.Close(); -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - if (access(Name.ShardedPath.c_str(), F_OK) == 0) - { - return CasStore::InsertResult{.New = false}; - } -#endif - - RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); - -#if ZEN_PLATFORM_WINDOWS - // For now, use double-checked locking to see if someone else was first - - hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); - - if (SUCCEEDED(hRes)) - { - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) - { - // If we succeeded in opening the file then and the size is correct we don't need to do anything - // else because someone else managed to create the file before we did. Just return. - - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) + if (auto It = m_Index.find(ChunkHash); It != m_Index.end()) { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + OldChunkSize = It->second.Size; + m_Index.erase(It); } - return CasStore::InsertResult{.New = IsNew}; } - else + if (OldChunkSize != (uint64)-1) { - ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", - Name.ShardedPath.ToUtf8(), - ChunkSize, - FileSize); - PayloadFile.Close(); + m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = OldChunkSize}); + m_TotalSize.fetch_sub(OldChunkSize, std::memory_order::relaxed); } } - else if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) - { - ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); - } - auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; - - hRes = InternalCreateFile(); + // We need to do a copy of the source data +#if ZEN_PLATFORM_WINDOWS + windows::FileHandle PayloadFile; + auto InternalCreateFile = [&] { return PayloadFile.Create(ChunkPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; + HRESULT hRes = InternalCreateFile(); if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) { // Ensure parent directories exist and retry file creation - CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); + CreateDirectories(ChunkPath.parent_path()); hRes = InternalCreateFile(); } - if (FAILED(hRes)) { - ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8())); + ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}' for write", ChunkPath)); } + auto WriteBytes = [&](windows::FileHandle& PayloadFile, const void* Cursor, uint32_t Size) { + HRESULT WriteRes = PayloadFile.Write(Cursor, Size); + if (FAILED(WriteRes)) + { + ThrowSystemException(hRes, fmt::format("failed to write {} bytes to shard file '{}'", ChunkSize, ChunkPath)); + } + }; #else // Attempt to exclusively create the file. auto InternalCreateFile = [&] { - int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); + int Fd = open(ChunkPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666); if (Fd >= 0) { fchmod(Fd, 0666); } return Fd; }; - int Fd = InternalCreateFile(); - if (Fd < 0) + int Fd = InternalCreateFile(); + int Error = (Fd < 0) ? errno : 0; + if (Error == ENOENT) { - switch (zen::GetLastError()) - { - case EEXIST: - // Another thread has beat us to it so we're golden. - { - HashLock.ReleaseNow(); - - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); - } - return {.New = IsNew}; - } - break; - - case ENOENT: - if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) - { - Fd = InternalCreateFile(); - if (Fd >= 0) - { - break; - } - } - ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); - - default: - ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); - } + CreateDirectories(ChunkPath.parent_path()); + Fd = InternalCreateFile(); + Error = Fd < 0 ? errno : 0; + } + if (Error) + { + ThrowSystemError(Error, fmt::format("Failed to open shard file '{}' for write", ChunkPath)); } struct FdWrapper { ~FdWrapper() { Close(); } - void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); } + void Write(const void* Cursor, uint32_t Size) + { + ssize_t Written = write(Fd, Cursor, Size); + if (Written == -1) + { + ThrowLastError(fmt::format("failed to write {} bytes to shard file '{}'", ChunkSize, ChunkPath)); + } + } void Close() { if (Fd >= 0) @@ -695,42 +413,36 @@ FileCasStrategy::InsertChunkData(const void* const ChunkData, const size_t Chunk Fd = -1; } } - int Fd; - } PayloadFile = {Fd}; -#endif // ZEN_PLATFORM_WINDOWS + const char* ChunkPath = nullptr; + uint64 ChunkSize = 0; + int Fd; + } PayloadFile = {.ChunkPath = ChunkPath.c_str(), .ChunkSize = ChunkSize, .Fd = Fd}; - size_t ChunkRemain = ChunkSize; - auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData); - - while (ChunkRemain != 0) - { - uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain)); - - PayloadFile.Write(ChunkCursor, ByteCount); - - ChunkCursor += ByteCount; - ChunkRemain -= ByteCount; - } - - // We cannot rely on RAII to close the file handle since it would be closed - // *after* the lock is released due to the initialization order - PayloadFile.Close(); - - m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); + auto WriteBytes = [](FdWrapper& PayloadFile, const void* Cursor, uint32_t Size) { PayloadFile.Write(Cursor, Size); }; +#endif // ZEN_PLATFORM_WINDOWS - HashLock.ReleaseNow(); + uint64_t ChunkRemain = ChunkSize; + auto ChunkCursor = reinterpret_cast<const uint8_t*>(Chunk.GetData()); - bool IsNew = false; + try { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second; + while (ChunkRemain != 0) + { + uint32_t ByteCount = uint32_t(std::min<uint64_t>(4 * 1024 * 1024ull, ChunkRemain)); + WriteBytes(PayloadFile, ChunkCursor, ByteCount); + ChunkCursor += ByteCount; + ChunkRemain -= ByteCount; + } } - if (IsNew) + catch (const std::exception Ex) { - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + PayloadFile.Close(); + std::error_code DummyEc; + std::filesystem::remove(ChunkPath, DummyEc); + throw; } - - return {.New = IsNew}; + bool IsNew = UpdateIndex(ChunkHash, Chunk.Size()); + return CasStore::InsertResult{.New = IsNew}; } IoBuffer @@ -753,12 +465,72 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) } } - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - RwLock::SharedLockScope _(LockForHash(ChunkHash)); + ShardingHelper Name(m_RootDirectory, ChunkHash); + const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); + + RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash)); - if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); Chunk.GetSize() == ExpectedSize) + if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk) { - return Chunk; + uint64 ChunkSize = Chunk.GetSize(); + if (ChunkSize == ExpectedSize) + { + return Chunk; + } + + IoHash RawHash; + uint64 RawSize; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); Compressed) + { + if (Compressed.GetCompressedSize() == ChunkSize) + { + // The index is wrong due to a previous problem where we failed to delete a payload due to other process locking it. + // The commit that adds this recovery also tightens up the problem where we didn't update the new filesize in the index + // when overwriting a payload with same RawHash but a different size (we failed to properly update the index). + // This can happen when the compression algorithm changes/is updated + UpdateIndex(ChunkHash, ChunkSize); + ZEN_WARN("FileCas recovered file {} with size {}, expected size {}", ChunkPath, ChunkSize, ExpectedSize); + return Chunk; + } + else + { + ZEN_WARN("FileCas file {} compressed size {} does not match size on disk {}", + ChunkPath, + Compressed.GetCompressedSize(), + ChunkSize); + } + } + else + { + ZEN_WARN("FileCas file {} does not have a valid compressed buffer header", ChunkPath); + } + + { + std::error_code Ec; + std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); + std::filesystem::rename(ChunkPath, TempPath, Ec); + if (!Ec) + { + Chunk.SetDeleteOnClose(true); + ZEN_INFO("FileCas deleted malformed file {}", ChunkPath); + } + } + + // Remove the offending file from the index so we don't try to read from it again + bool WasRemoved = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_Index.find(ChunkHash); It != m_Index.end() && It->second.Size == ExpectedSize) + { + m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed); + m_Index.erase(It); + WasRemoved = true; + } + } + if (WasRemoved) + { + m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = ChunkSize}); + } } return {}; @@ -778,19 +550,19 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) { ZEN_TRACE_CPU("FileCas::DeleteChunk"); - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - - uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec)); + ShardingHelper Name(m_RootDirectory, ChunkHash); + const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); + uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(ChunkPath, Ec)); if (Ec) { - ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); + ZEN_WARN("get file size FAILED, file cas '{}'", ChunkPath); FileSize = 0; } - ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize)); - std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + ZEN_DEBUG("deleting CAS payload file '{}' {}", ChunkPath, NiceBytes(FileSize)); + std::filesystem::remove(ChunkPath, Ec); - if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str())) + if (!Ec || !std::filesystem::exists(ChunkPath)) { { RwLock::ExclusiveLockScope _(m_Lock); @@ -840,8 +612,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, if (!FoundChunkIndexes.empty()) { auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) { - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + ShardingHelper Name(m_RootDirectory, ChunkHashes[ChunkIndex]); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath()); if (Payload) { if (!AsyncCallback(ChunkIndex, std::move(Payload))) @@ -906,8 +678,8 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& for (const auto& It : m_Index) { const IoHash& NameHash = It.first; - ShardingHelper Name(m_RootDirectory.c_str(), NameHash); - IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); + ShardingHelper Name(m_RootDirectory, NameHash); + IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath()); Callback(NameHash, std::move(Payload)); } } @@ -962,7 +734,7 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx) RwLock::ExclusiveLockScope _(m_Lock); for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) { - if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second) + if (m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}).second) { m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed); m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size}); @@ -1455,7 +1227,8 @@ public: size_t Skipped = 0; for (const IoHash& ChunkHash : m_ReferencesToClean) { - FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory.c_str(), ChunkHash); + FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory, ChunkHash); + const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath(); { RwLock::SharedLockScope __(m_FileCasStrategy.m_Lock); if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end()) @@ -1474,23 +1247,40 @@ public: { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Deleting CAS payload file '{}'", m_FileCasStrategy.m_RootDirectory, - Name.ShardedPath.ToUtf8()); + ChunkPath); } std::error_code Ec; - uint64_t SizeOnDisk = std::filesystem::file_size(Name.ShardedPath.c_str(), Ec); + uint64_t SizeOnDisk = std::filesystem::file_size(ChunkPath, Ec); if (Ec) { SizeOnDisk = 0; } - bool Existed = std::filesystem::remove(Name.ShardedPath.c_str(), Ec); + bool Existed = std::filesystem::remove(ChunkPath, Ec); + if (Ec) + { + // Target file may be open for read, attempt to move it to a temp file and mark it delete on close + IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath); + if (OldChunk) + { + std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString()); + std::filesystem::rename(ChunkPath, TempPath, Ec); + if (!Ec) + { + OldChunk.SetDeleteOnClose(true); + Existed = true; + } + } + } + if (Ec) { ZEN_WARN("GCV2: filecas [COMPACT] '{}': Failed deleting CAS payload file '{}'. Reason '{}'", m_FileCasStrategy.m_RootDirectory, - Name.ShardedPath.ToUtf8(), + ChunkPath, Ec.message()); continue; } + if (!Existed) { continue; @@ -1500,14 +1290,14 @@ public: else { std::error_code Ec; - bool Existed = std::filesystem::is_regular_file(Name.ShardedPath.c_str(), Ec); + bool Existed = std::filesystem::is_regular_file(ChunkPath, Ec); if (Ec) { if (Ctx.Settings.Verbose) { ZEN_INFO("GCV2: filecas [COMPACT] '{}': Failed checking CAS payload file '{}'. Reason '{}'", m_FileCasStrategy.m_RootDirectory, - Name.ShardedPath.ToUtf8(), + ChunkPath, Ec.message()); } continue; @@ -1671,6 +1461,225 @@ FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&) #if ZEN_WITH_TESTS +TEST_CASE("cas.chunk.mismatch") +{ +} + +TEST_CASE("cas.chunk.moveoverwrite") +{ + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); + + { + std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; + + IoBuffer PayloadBlob(1024 * 1024); + memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize()); + CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob)); + { + WriteFile(Payload1Path, CompressedPayload1.GetCompressed()); + IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); + Payload1.SetDeleteOnClose(true); + CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash()); + CHECK_EQ(Result.New, true); + CHECK(!std::filesystem::exists(Payload1Path)); + } + { + std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"}; + WriteFile(Payload1BPath, CompressedPayload1.GetCompressed()); + IoBuffer Payload1B = IoBufferBuilder::MakeFromTemporaryFile(Payload1BPath); + Payload1B.SetDeleteOnClose(true); + + CasStore::InsertResult Result = FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash()); + CHECK_EQ(Result.New, false); + CHECK(std::filesystem::exists(Payload1BPath)); + Payload1B = {}; + CHECK(!std::filesystem::exists(Payload1BPath)); + } + + IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); + CHECK(FetchedPayload); + CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize()); + + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); + CHECK(ValidatePayload1); + } + + CompressedBuffer CompressedPayload2 = + CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None); + std::filesystem::path Payload2Path{TempDir.Path() / "payload_2"}; + WriteFile(Payload2Path, CompressedPayload2.GetCompressed()); + IoBuffer Payload2 = IoBufferBuilder::MakeFromTemporaryFile(Payload2Path); + Payload2.SetDeleteOnClose(true); + { + CasStore::InsertResult Result = FileCas.InsertChunk(Payload2, CompressedPayload2.DecodeRawHash()); + CHECK_EQ(Result.New, true); + } + + Payload2 = {}; + CHECK(!std::filesystem::exists(Payload2Path)); + + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); + CHECK(ValidatePayload1); + } + + IoBuffer FetchedPayload2 = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); + CHECK(FetchedPayload2); + CHECK(FetchedPayload2.GetSize() == CompressedPayload2.GetCompressedSize()); + + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer ValidatePayload2 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload2), RawHash, RawSize); + CHECK(ValidatePayload2); + } + } +} + +TEST_CASE("cas.chunk.copyoverwrite") +{ + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); + + { + std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; + + IoBuffer PayloadBlob(1024 * 1024); + memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize()); + CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob)); + { + WriteFile(Payload1Path, CompressedPayload1.GetCompressed()); + IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path); + Payload1.SetDeleteOnClose(true); + CasStore::InsertResult Result = + FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); + CHECK_EQ(Result.New, true); + CHECK(std::filesystem::exists(Payload1Path)); + Payload1 = {}; + CHECK(!std::filesystem::exists(Payload1Path)); + } + { + std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"}; + WriteFile(Payload1BPath, CompressedPayload1.GetCompressed()); + IoBuffer Payload1B = IoBufferBuilder::MakeFromTemporaryFile(Payload1BPath); + Payload1B.SetDeleteOnClose(true); + + CasStore::InsertResult Result = + FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); + CHECK_EQ(Result.New, false); + CHECK(std::filesystem::exists(Payload1BPath)); + Payload1B = {}; + CHECK(!std::filesystem::exists(Payload1BPath)); + } + + IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); + CHECK(FetchedPayload); + CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize()); + + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); + CHECK(ValidatePayload1); + } + + CompressedBuffer CompressedPayload2 = + CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None); + std::filesystem::path Payload2Path{TempDir.Path() / "payload_2"}; + WriteFile(Payload2Path, CompressedPayload2.GetCompressed()); + IoBuffer Payload2 = IoBufferBuilder::MakeFromTemporaryFile(Payload2Path); + Payload2.SetDeleteOnClose(true); + { + CasStore::InsertResult Result = + FileCas.InsertChunk(Payload2, CompressedPayload2.DecodeRawHash(), CasStore::InsertMode::kCopyOnly); + CHECK_EQ(Result.New, true); + } + + Payload2 = {}; + CHECK(!std::filesystem::exists(Payload2Path)); + + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize); + CHECK(ValidatePayload1); + } + + IoBuffer FetchedPayload2 = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); + CHECK(FetchedPayload2); + CHECK(FetchedPayload2.GetSize() == CompressedPayload2.GetCompressedSize()); + + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer ValidatePayload2 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload2), RawHash, RawSize); + CHECK(ValidatePayload2); + } + } +} + +TEST_CASE("cas.chunk.recoverbadsize") +{ + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + + FileCasStrategy FileCas(Gc); + FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true); + + { + std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"}; + + IoBuffer PayloadBlob(1024 * 1024); + memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize()); + CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob)); + CompressedBuffer CompressedPayload2 = + CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None); + + CasStore::InsertResult Result = + FileCas.InsertChunk(CompressedPayload1.GetCompressed().Flatten().AsIoBuffer(), CompressedPayload1.DecodeRawHash()); + CHECK_EQ(Result.New, true); + + IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); + CHECK(FetchedPayload); + CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize()); + + IoBufferFileReference Ref{nullptr, 0, 0}; + CHECK(FetchedPayload.GetFileReference(Ref)); + std::error_code Ec; + std::filesystem::path CASPath = PathFromHandle(Ref.FileHandle, Ec); + CHECK(!Ec); + FetchedPayload = {}; + + WriteFile(CASPath, CompressedPayload2.GetCompressed()); + + FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); + CHECK(FetchedPayload); + CHECK(FetchedPayload.GetSize() == CompressedPayload2.GetCompressedSize()); + FetchedPayload = {}; + + WriteFile(CASPath, IoBuffer(1024)); + CHECK(FileCas.HaveChunk(CompressedPayload1.DecodeRawHash())); + FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash()); + CHECK(!FetchedPayload); + CHECK(!FileCas.HaveChunk(CompressedPayload1.DecodeRawHash())); + } +} + TEST_CASE("cas.file.move") { // specifying an absolute path here can be helpful when using procmon to dig into things diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index ff7126325..fb4b1888b 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -61,8 +61,6 @@ private: }; using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>; - CasStore::InsertResult InsertChunkData(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); - LoggerRef m_Log; GcManager& m_Gc; std::filesystem::path m_RootDirectory; @@ -103,6 +101,8 @@ private: ExtendablePathBuilder<128> ShardedPath; }; + bool UpdateIndex(const IoHash& ChunkHash, uint64_t ChunkSize); + friend class FileCasReferencePruner; friend class FileCasStoreCompactor; }; |