aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-22 10:58:20 +0100
committerGitHub Enterprise <[email protected]>2024-11-22 10:58:20 +0100
commit36658631e3f29b27d08b36fa6cf143d6902b7789 (patch)
tree8c618ec8081a40a479dc98dfd91b96b8b1e21e9c /src
parentfixed off-by-one in GetPidStatus (Linux) which might cause spurious errors (#... (diff)
downloadzen-36658631e3f29b27d08b36fa6cf143d6902b7789.tar.xz
zen-36658631e3f29b27d08b36fa6cf143d6902b7789.zip
fix inconsistencies in filecas due to failing to remove payload file during GC (#224)
make sure we rewrite filecas entries if chunk size changes (due to compression changes) hardening of move/write files in filecas if we encounter a filecas entry with mismatching size (due to pre-existing bug) we validate the file and update the index if we find a bad filecas file on disk we now attempt to remove it
Diffstat (limited to 'src')
-rw-r--r--src/zencore/filesystem.cpp4
-rw-r--r--src/zenstore/compactcas.cpp2
-rw-r--r--src/zenstore/filecas.cpp857
-rw-r--r--src/zenstore/filecas.h4
4 files changed, 438 insertions, 429 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 93383a656..9ca5f1131 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -902,14 +902,14 @@ MoveToFile(std::filesystem::path Path, IoBuffer Data)
{
return false;
}
- int Ret = link(SourcePath.c_str(), Path.c_str());
+ int Ret = rename(SourcePath.c_str(), Path.c_str());
if (Ret < 0)
{
int32_t err = errno;
if (err == ENOENT)
{
zen::CreateDirectories(Path.parent_path());
- Ret = link(SourcePath.c_str(), Path.c_str());
+ Ret = rename(SourcePath.c_str(), Path.c_str());
}
}
if (Ret < 0)
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 7f1300177..b3309f7a7 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -257,7 +257,7 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash>
RwLock::ExclusiveLockScope _(m_LocationMapLock);
for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries)
{
- m_LocationMap.emplace(DiskIndexEntry.Key, m_Locations.size());
+ m_LocationMap.insert_or_assign(DiskIndexEntry.Key, m_Locations.size());
m_Locations.push_back(DiskIndexEntry.Location);
}
}
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 6d5bcff96..2031804c9 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -96,7 +96,7 @@ namespace filecas::impl {
FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
{
- ShardedPath.Append(RootPath.c_str());
+ ShardedPath.Append(RootPath);
ExtendableStringBuilder<64> HashString;
ChunkHash.ToHexString(HashString);
@@ -245,6 +245,28 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
}
}
+bool
+FileCasStrategy::UpdateIndex(const IoHash& ChunkHash, uint64_t ChunkSize)
+{
+ uint64 OldChunkSize = 0;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
+ {
+ OldChunkSize = It->second.Size;
+ if (OldChunkSize == ChunkSize)
+ {
+ return false;
+ }
+ }
+ m_Index.insert_or_assign(ChunkHash, IndexEntry{.Size = ChunkSize});
+ }
+ m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
+ m_TotalSize.fetch_sub(OldChunkSize, std::memory_order::relaxed);
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ return true;
+}
+
CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
{
@@ -256,437 +278,133 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
#endif
- if (Mode == CasStore::InsertMode::kCopyOnly)
+ uint64_t ChunkSize = Chunk.GetSize();
{
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
{
- RwLock::SharedLockScope _(m_Lock);
- if (m_Index.contains(ChunkHash))
+ if (It->second.Size == ChunkSize)
{
return CasStore::InsertResult{.New = false};
}
}
- return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash);
}
- // File-based chunks have special case handling whereby we move the file into
- // place in the file store directory, thus avoiding unnecessary copying
-
- IoBufferFileReference FileRef;
- bool IsWholeFile = Chunk.IsWholeFile();
- if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef))
- {
- ZEN_TRACE_CPU("FileCas::InsertChunk::Move");
-
- {
- bool Exists = true;
- {
- RwLock::SharedLockScope _(m_Lock);
- Exists = m_Index.contains(ChunkHash);
- }
- if (Exists)
- {
- return CasStore::InsertResult{.New = false};
- }
- }
-
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
-
-#if ZEN_PLATFORM_WINDOWS
- const HANDLE ChunkFileHandle = FileRef.FileHandle;
- // See if file already exists
- {
- ZEN_TRACE_CPU("FileCas::InsertChunk::Exists");
-
- windows::FileHandle PayloadFile;
-
- if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
- {
- // If we succeeded in opening the target file then we don't need to do anything else because it already exists
- // and should contain the content we were about to insert
-
- // We do need to ensure the source file goes away on close, however
- size_t ChunkSize = Chunk.GetSize();
- uint64_t FileSize = 0;
- if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
- {
- HashLock.ReleaseNow();
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
-
- return CasStore::InsertResult{.New = IsNew};
- }
- else
- {
- ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
- Name.ShardedPath.ToUtf8(),
- ChunkSize,
- FileSize);
- }
- }
- else
- {
- if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
- {
- // Shard directory does not exist
- }
- else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND))
- {
- // Shard directory exists, but not the file
- }
- else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION))
- {
- // Sharing violation, likely because we are trying to open a file
- // which has been renamed on another thread, and the file handle
- // used to rename it is still open. We handle this case below
- // instead of here
- }
- else
- {
- ZEN_INFO("Unexpected error opening file '{}': {}", Name.ShardedPath.ToUtf8(), hRes);
- }
- }
- }
-
- std::filesystem::path FullPath(Name.ShardedPath.c_str());
-
- std::filesystem::path FilePath = FullPath.parent_path();
- std::wstring FileName = FullPath.native();
-
- const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR));
- FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize));
- memset(RenameInfo, 0, BufferSize);
-
- RenameInfo->ReplaceIfExists = FALSE;
- RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size());
- memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR));
- RenameInfo->FileName[FileName.size()] = 0;
-
- auto $ = MakeGuard([&] { Memory::Free(RenameInfo); });
-
- // Try to move file into place
- BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
-
- if (!Success)
- {
- // The rename/move could fail because the target directory does not yet exist. This code attempts
- // to create it
-
- windows::FileHandle DirHandle;
-
- auto InternalCreateDirectoryHandle = [&] {
- return DirHandle.Create(FilePath.c_str(),
- GENERIC_READ | GENERIC_WRITE,
- FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
- OPEN_EXISTING,
- FILE_FLAG_BACKUP_SEMANTICS);
- };
-
- // It's possible for several threads to enter this logic trying to create the same
- // directory. Only one will create the directory of course, but all threads will
- // make it through okay
-
- HRESULT hRes = InternalCreateDirectoryHandle();
-
- if (FAILED(hRes))
- {
- // TODO: we can handle directory creation more intelligently and efficiently than
- // this currently does
-
- CreateDirectories(FilePath.c_str());
-
- hRes = InternalCreateDirectoryHandle();
- }
-
- if (FAILED(hRes))
- {
- ThrowSystemException(hRes, fmt::format("Failed to open shard directory '{}'", FilePath));
- }
-
- // Retry rename/move
-
- Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
- }
-
- if (Success)
- {
- m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
- Chunk.SetDeleteOnClose(false);
+ ShardingHelper Name(m_RootDirectory, ChunkHash);
+ RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
- HashLock.ReleaseNow();
+ const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
-
- const DWORD LastError = GetLastError();
+ if (Mode == CasStore::InsertMode::kMayBeMovedInPlace)
+ {
+ // File-based chunks have special case handling whereby we move the file into
+ // place in the file store directory, thus avoiding unnecessary copying
- if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
+ if (MoveToFile(ChunkPath, Chunk))
{
- HashLock.ReleaseNow();
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
-
+ bool IsNew = UpdateIndex(ChunkHash, Chunk.Size());
return CasStore::InsertResult{.New = IsNew};
}
- ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
- GetSystemErrorAsString(LastError),
- ChunkHash);
+ }
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ // Target file may be open for read, attempt to move it to a temp file and mark it as delete on close
+ IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath);
+ if (OldChunk)
+ {
+ std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString());
std::error_code Ec;
- std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle, Ec);
+ std::filesystem::rename(ChunkPath, TempPath, Ec);
if (Ec)
{
- ZEN_WARN("link of CAS payload file {} failed ('{}'), falling back to regular write for insert of {}",
- FileRef.FileHandle,
- Ec.message(),
- ChunkHash);
+ throw std::system_error(Ec, fmt::format("unable to move existing CAS file {} to {}", ChunkPath, TempPath));
}
- else
+ OldChunk.SetDeleteOnClose(true);
+
+ if (Mode == CasStore::InsertMode::kMayBeMovedInPlace)
{
- std::filesystem::path DestPath = Name.ShardedPath.c_str();
- int Ret = link(SourcePath.c_str(), DestPath.c_str());
- if (Ret < 0 && zen::GetLastError() == ENOENT)
- {
- // Destination directory doesn't exist. Create it any try again.
- CreateDirectories(DestPath.parent_path().c_str());
- Ret = link(SourcePath.c_str(), DestPath.c_str());
- }
- if (Ret == 0)
+ if (MoveToFile(ChunkPath, Chunk))
{
- m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
- Chunk.SetDeleteOnClose(false);
-
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
+ bool IsNew = UpdateIndex(ChunkHash, Chunk.Size());
return CasStore::InsertResult{.New = IsNew};
}
- else
- {
- int LinkError = zen::GetLastError();
-
- // It is possible that someone beat us to it in linking the file. In that
- // case a "file exists" error is okay. All others are not.
- if (LinkError == EEXIST)
- {
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
-
- ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
- GetSystemErrorAsString(LinkError),
- ChunkHash);
- }
}
-#endif // ZEN_PLATFORM_*
}
- return InsertChunkData(Chunk.Data(), Chunk.Size(), ChunkHash);
-}
-
-CasStore::InsertResult
-FileCasStrategy::InsertChunkData(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
-{
- ZEN_TRACE_CPU("FileCas::InsertChunkData");
-
- ZEN_ASSERT(m_IsInitialized);
-
+ // We moved the file, make sure the index reflects that
{
- RwLock::SharedLockScope _(m_Lock);
- if (m_Index.contains(ChunkHash))
- {
- return {.New = false};
- }
- }
-
- ZEN_TRACE_CPU("FileCas::InsertChunkData::Write");
-
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- // See if file already exists
-
-#if ZEN_PLATFORM_WINDOWS
- windows::FileHandle PayloadFile;
-
- HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
-
- if (SUCCEEDED(hRes))
- {
- // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the
- // content we were about to insert
-
- bool IsNew = false;
+ uint64 OldChunkSize = (uint64)-1;
{
RwLock::ExclusiveLockScope _(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
-
- PayloadFile.Close();
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- if (access(Name.ShardedPath.c_str(), F_OK) == 0)
- {
- return CasStore::InsertResult{.New = false};
- }
-#endif
-
- RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
-
-#if ZEN_PLATFORM_WINDOWS
- // For now, use double-checked locking to see if someone else was first
-
- hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
-
- if (SUCCEEDED(hRes))
- {
- uint64_t FileSize = 0;
- if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
- {
- // If we succeeded in opening the file then and the size is correct we don't need to do anything
- // else because someone else managed to create the file before we did. Just return.
-
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
{
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ OldChunkSize = It->second.Size;
+ m_Index.erase(It);
}
- return CasStore::InsertResult{.New = IsNew};
}
- else
+ if (OldChunkSize != (uint64)-1)
{
- ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
- Name.ShardedPath.ToUtf8(),
- ChunkSize,
- FileSize);
- PayloadFile.Close();
+ m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = OldChunkSize});
+ m_TotalSize.fetch_sub(OldChunkSize, std::memory_order::relaxed);
}
}
- else if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)))
- {
- ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes));
- }
- auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
-
- hRes = InternalCreateFile();
+ // We need to do a copy of the source data
+#if ZEN_PLATFORM_WINDOWS
+ windows::FileHandle PayloadFile;
+ auto InternalCreateFile = [&] { return PayloadFile.Create(ChunkPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
+ HRESULT hRes = InternalCreateFile();
if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
{
// Ensure parent directories exist and retry file creation
- CreateDirectories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len));
+ CreateDirectories(ChunkPath.parent_path());
hRes = InternalCreateFile();
}
-
if (FAILED(hRes))
{
- ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}'", Name.ShardedPath.ToUtf8()));
+ ThrowSystemException(hRes, fmt::format("Failed to open shard file '{}' for write", ChunkPath));
}
+ auto WriteBytes = [&](windows::FileHandle& PayloadFile, const void* Cursor, uint32_t Size) {
+ HRESULT WriteRes = PayloadFile.Write(Cursor, Size);
+ if (FAILED(WriteRes))
+ {
+ ThrowSystemException(hRes, fmt::format("failed to write {} bytes to shard file '{}'", ChunkSize, ChunkPath));
+ }
+ };
#else
// Attempt to exclusively create the file.
auto InternalCreateFile = [&] {
- int Fd = open(Name.ShardedPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
+ int Fd = open(ChunkPath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_CLOEXEC, 0666);
if (Fd >= 0)
{
fchmod(Fd, 0666);
}
return Fd;
};
- int Fd = InternalCreateFile();
- if (Fd < 0)
+ int Fd = InternalCreateFile();
+ int Error = (Fd < 0) ? errno : 0;
+ if (Error == ENOENT)
{
- switch (zen::GetLastError())
- {
- case EEXIST:
- // Another thread has beat us to it so we're golden.
- {
- HashLock.ReleaseNow();
-
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
- }
- return {.New = IsNew};
- }
- break;
-
- case ENOENT:
- if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len)))
- {
- Fd = InternalCreateFile();
- if (Fd >= 0)
- {
- break;
- }
- }
- ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath));
-
- default:
- ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8()));
- }
+ CreateDirectories(ChunkPath.parent_path());
+ Fd = InternalCreateFile();
+ Error = Fd < 0 ? errno : 0;
+ }
+ if (Error)
+ {
+ ThrowSystemError(Error, fmt::format("Failed to open shard file '{}' for write", ChunkPath));
}
struct FdWrapper
{
~FdWrapper() { Close(); }
- void Write(const void* Cursor, size_t Size) { (void)!write(Fd, Cursor, Size); }
+ void Write(const void* Cursor, uint32_t Size)
+ {
+ ssize_t Written = write(Fd, Cursor, Size);
+ if (Written == -1)
+ {
+ ThrowLastError(fmt::format("failed to write {} bytes to shard file '{}'", ChunkSize, ChunkPath));
+ }
+ }
void Close()
{
if (Fd >= 0)
@@ -695,42 +413,36 @@ FileCasStrategy::InsertChunkData(const void* const ChunkData, const size_t Chunk
Fd = -1;
}
}
- int Fd;
- } PayloadFile = {Fd};
-#endif // ZEN_PLATFORM_WINDOWS
+ const char* ChunkPath = nullptr;
+ uint64 ChunkSize = 0;
+ int Fd;
+ } PayloadFile = {.ChunkPath = ChunkPath.c_str(), .ChunkSize = ChunkSize, .Fd = Fd};
- size_t ChunkRemain = ChunkSize;
- auto ChunkCursor = reinterpret_cast<const uint8_t*>(ChunkData);
-
- while (ChunkRemain != 0)
- {
- uint32_t ByteCount = uint32_t(std::min<size_t>(4 * 1024 * 1024ull, ChunkRemain));
-
- PayloadFile.Write(ChunkCursor, ByteCount);
-
- ChunkCursor += ByteCount;
- ChunkRemain -= ByteCount;
- }
-
- // We cannot rely on RAII to close the file handle since it would be closed
- // *after* the lock is released due to the initialization order
- PayloadFile.Close();
-
- m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
+ auto WriteBytes = [](FdWrapper& PayloadFile, const void* Cursor, uint32_t Size) { PayloadFile.Write(Cursor, Size); };
+#endif // ZEN_PLATFORM_WINDOWS
- HashLock.ReleaseNow();
+ uint64_t ChunkRemain = ChunkSize;
+ auto ChunkCursor = reinterpret_cast<const uint8_t*>(Chunk.GetData());
- bool IsNew = false;
+ try
{
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = ChunkSize}}).second;
+ while (ChunkRemain != 0)
+ {
+ uint32_t ByteCount = uint32_t(std::min<uint64_t>(4 * 1024 * 1024ull, ChunkRemain));
+ WriteBytes(PayloadFile, ChunkCursor, ByteCount);
+ ChunkCursor += ByteCount;
+ ChunkRemain -= ByteCount;
+ }
}
- if (IsNew)
+ catch (const std::exception Ex)
{
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ PayloadFile.Close();
+ std::error_code DummyEc;
+ std::filesystem::remove(ChunkPath, DummyEc);
+ throw;
}
-
- return {.New = IsNew};
+ bool IsNew = UpdateIndex(ChunkHash, Chunk.Size());
+ return CasStore::InsertResult{.New = IsNew};
}
IoBuffer
@@ -753,12 +465,72 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
}
}
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
- RwLock::SharedLockScope _(LockForHash(ChunkHash));
+ ShardingHelper Name(m_RootDirectory, ChunkHash);
+ const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath();
+
+ RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash));
- if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); Chunk.GetSize() == ExpectedSize)
+ if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk)
{
- return Chunk;
+ uint64 ChunkSize = Chunk.GetSize();
+ if (ChunkSize == ExpectedSize)
+ {
+ return Chunk;
+ }
+
+ IoHash RawHash;
+ uint64 RawSize;
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize); Compressed)
+ {
+ if (Compressed.GetCompressedSize() == ChunkSize)
+ {
+ // The index is wrong due to a previous problem where we failed to delete a payload due to other process locking it.
+ // The commit that adds this recovery also tightens up the problem where we didn't update the new filesize in the index
+ // when overwriting a payload with same RawHash but a different size (we failed to properly update the index).
+ // This can happen when the compression algorithm changes/is updated
+ UpdateIndex(ChunkHash, ChunkSize);
+ ZEN_WARN("FileCas recovered file {} with size {}, expected size {}", ChunkPath, ChunkSize, ExpectedSize);
+ return Chunk;
+ }
+ else
+ {
+ ZEN_WARN("FileCas file {} compressed size {} does not match size on disk {}",
+ ChunkPath,
+ Compressed.GetCompressedSize(),
+ ChunkSize);
+ }
+ }
+ else
+ {
+ ZEN_WARN("FileCas file {} does not have a valid compressed buffer header", ChunkPath);
+ }
+
+ {
+ std::error_code Ec;
+ std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString());
+ std::filesystem::rename(ChunkPath, TempPath, Ec);
+ if (!Ec)
+ {
+ Chunk.SetDeleteOnClose(true);
+ ZEN_INFO("FileCas deleted malformed file {}", ChunkPath);
+ }
+ }
+
+ // Remove the offending file from the index so we don't try to read from it again
+ bool WasRemoved = false;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end() && It->second.Size == ExpectedSize)
+ {
+ m_TotalSize.fetch_sub(It->second.Size, std::memory_order_relaxed);
+ m_Index.erase(It);
+ WasRemoved = true;
+ }
+ }
+ if (WasRemoved)
+ {
+ m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = ChunkSize});
+ }
}
return {};
@@ -778,19 +550,19 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
{
ZEN_TRACE_CPU("FileCas::DeleteChunk");
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
-
- uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(Name.ShardedPath.c_str(), Ec));
+ ShardingHelper Name(m_RootDirectory, ChunkHash);
+ const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath();
+ uint64_t FileSize = static_cast<uint64_t>(std::filesystem::file_size(ChunkPath, Ec));
if (Ec)
{
- ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8());
+ ZEN_WARN("get file size FAILED, file cas '{}'", ChunkPath);
FileSize = 0;
}
- ZEN_DEBUG("deleting CAS payload file '{}' {}", Name.ShardedPath.ToUtf8(), NiceBytes(FileSize));
- std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+ ZEN_DEBUG("deleting CAS payload file '{}' {}", ChunkPath, NiceBytes(FileSize));
+ std::filesystem::remove(ChunkPath, Ec);
- if (!Ec || !std::filesystem::exists(Name.ShardedPath.c_str()))
+ if (!Ec || !std::filesystem::exists(ChunkPath))
{
{
RwLock::ExclusiveLockScope _(m_Lock);
@@ -840,8 +612,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
if (!FoundChunkIndexes.empty())
{
auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) {
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHashes[ChunkIndex]);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ ShardingHelper Name(m_RootDirectory, ChunkHashes[ChunkIndex]);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath());
if (Payload)
{
if (!AsyncCallback(ChunkIndex, std::move(Payload)))
@@ -906,8 +678,8 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&&
for (const auto& It : m_Index)
{
const IoHash& NameHash = It.first;
- ShardingHelper Name(m_RootDirectory.c_str(), NameHash);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
+ ShardingHelper Name(m_RootDirectory, NameHash);
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath());
Callback(NameHash, std::move(Payload));
}
}
@@ -962,7 +734,7 @@ FileCasStrategy::ScrubStorage(ScrubContext& Ctx)
RwLock::ExclusiveLockScope _(m_Lock);
for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
{
- if (m_Index.insert({Entry.Key, {.Size = Entry.Size}}).second)
+ if (m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}).second)
{
m_TotalSize.fetch_add(static_cast<uint64_t>(Entry.Size), std::memory_order::relaxed);
m_CasLog.Append({.Key = Entry.Key, .Size = Entry.Size});
@@ -1455,7 +1227,8 @@ public:
size_t Skipped = 0;
for (const IoHash& ChunkHash : m_ReferencesToClean)
{
- FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory.c_str(), ChunkHash);
+ FileCasStrategy::ShardingHelper Name(m_FileCasStrategy.m_RootDirectory, ChunkHash);
+ const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath();
{
RwLock::SharedLockScope __(m_FileCasStrategy.m_Lock);
if (auto It = m_FileCasStrategy.m_Index.find(ChunkHash); It != m_FileCasStrategy.m_Index.end())
@@ -1474,23 +1247,40 @@ public:
{
ZEN_INFO("GCV2: filecas [COMPACT] '{}': Deleting CAS payload file '{}'",
m_FileCasStrategy.m_RootDirectory,
- Name.ShardedPath.ToUtf8());
+ ChunkPath);
}
std::error_code Ec;
- uint64_t SizeOnDisk = std::filesystem::file_size(Name.ShardedPath.c_str(), Ec);
+ uint64_t SizeOnDisk = std::filesystem::file_size(ChunkPath, Ec);
if (Ec)
{
SizeOnDisk = 0;
}
- bool Existed = std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+ bool Existed = std::filesystem::remove(ChunkPath, Ec);
+ if (Ec)
+ {
+ // Target file may be open for read, attempt to move it to a temp file and mark it delete on close
+ IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath);
+ if (OldChunk)
+ {
+ std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString());
+ std::filesystem::rename(ChunkPath, TempPath, Ec);
+ if (!Ec)
+ {
+ OldChunk.SetDeleteOnClose(true);
+ Existed = true;
+ }
+ }
+ }
+
if (Ec)
{
ZEN_WARN("GCV2: filecas [COMPACT] '{}': Failed deleting CAS payload file '{}'. Reason '{}'",
m_FileCasStrategy.m_RootDirectory,
- Name.ShardedPath.ToUtf8(),
+ ChunkPath,
Ec.message());
continue;
}
+
if (!Existed)
{
continue;
@@ -1500,14 +1290,14 @@ public:
else
{
std::error_code Ec;
- bool Existed = std::filesystem::is_regular_file(Name.ShardedPath.c_str(), Ec);
+ bool Existed = std::filesystem::is_regular_file(ChunkPath, Ec);
if (Ec)
{
if (Ctx.Settings.Verbose)
{
ZEN_INFO("GCV2: filecas [COMPACT] '{}': Failed checking CAS payload file '{}'. Reason '{}'",
m_FileCasStrategy.m_RootDirectory,
- Name.ShardedPath.ToUtf8(),
+ ChunkPath,
Ec.message());
}
continue;
@@ -1671,6 +1461,225 @@ FileCasStrategy::CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats&)
#if ZEN_WITH_TESTS
+TEST_CASE("cas.chunk.mismatch")
+{
+}
+
+TEST_CASE("cas.chunk.moveoverwrite")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
+
+ {
+ std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
+
+ IoBuffer PayloadBlob(1024 * 1024);
+ memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize());
+ CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob));
+ {
+ WriteFile(Payload1Path, CompressedPayload1.GetCompressed());
+ IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path);
+ Payload1.SetDeleteOnClose(true);
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash());
+ CHECK_EQ(Result.New, true);
+ CHECK(!std::filesystem::exists(Payload1Path));
+ }
+ {
+ std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"};
+ WriteFile(Payload1BPath, CompressedPayload1.GetCompressed());
+ IoBuffer Payload1B = IoBufferBuilder::MakeFromTemporaryFile(Payload1BPath);
+ Payload1B.SetDeleteOnClose(true);
+
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash());
+ CHECK_EQ(Result.New, false);
+ CHECK(std::filesystem::exists(Payload1BPath));
+ Payload1B = {};
+ CHECK(!std::filesystem::exists(Payload1BPath));
+ }
+
+ IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash());
+ CHECK(FetchedPayload);
+ CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize());
+
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize);
+ CHECK(ValidatePayload1);
+ }
+
+ CompressedBuffer CompressedPayload2 =
+ CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ std::filesystem::path Payload2Path{TempDir.Path() / "payload_2"};
+ WriteFile(Payload2Path, CompressedPayload2.GetCompressed());
+ IoBuffer Payload2 = IoBufferBuilder::MakeFromTemporaryFile(Payload2Path);
+ Payload2.SetDeleteOnClose(true);
+ {
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload2, CompressedPayload2.DecodeRawHash());
+ CHECK_EQ(Result.New, true);
+ }
+
+ Payload2 = {};
+ CHECK(!std::filesystem::exists(Payload2Path));
+
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize);
+ CHECK(ValidatePayload1);
+ }
+
+ IoBuffer FetchedPayload2 = FileCas.FindChunk(CompressedPayload1.DecodeRawHash());
+ CHECK(FetchedPayload2);
+ CHECK(FetchedPayload2.GetSize() == CompressedPayload2.GetCompressedSize());
+
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer ValidatePayload2 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload2), RawHash, RawSize);
+ CHECK(ValidatePayload2);
+ }
+ }
+}
+
+TEST_CASE("cas.chunk.copyoverwrite")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
+
+ {
+ std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
+
+ IoBuffer PayloadBlob(1024 * 1024);
+ memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize());
+ CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob));
+ {
+ WriteFile(Payload1Path, CompressedPayload1.GetCompressed());
+ IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path);
+ Payload1.SetDeleteOnClose(true);
+ CasStore::InsertResult Result =
+ FileCas.InsertChunk(Payload1, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly);
+ CHECK_EQ(Result.New, true);
+ CHECK(std::filesystem::exists(Payload1Path));
+ Payload1 = {};
+ CHECK(!std::filesystem::exists(Payload1Path));
+ }
+ {
+ std::filesystem::path Payload1BPath{TempDir.Path() / "payload_1"};
+ WriteFile(Payload1BPath, CompressedPayload1.GetCompressed());
+ IoBuffer Payload1B = IoBufferBuilder::MakeFromTemporaryFile(Payload1BPath);
+ Payload1B.SetDeleteOnClose(true);
+
+ CasStore::InsertResult Result =
+ FileCas.InsertChunk(Payload1B, CompressedPayload1.DecodeRawHash(), CasStore::InsertMode::kCopyOnly);
+ CHECK_EQ(Result.New, false);
+ CHECK(std::filesystem::exists(Payload1BPath));
+ Payload1B = {};
+ CHECK(!std::filesystem::exists(Payload1BPath));
+ }
+
+ IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash());
+ CHECK(FetchedPayload);
+ CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize());
+
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize);
+ CHECK(ValidatePayload1);
+ }
+
+ CompressedBuffer CompressedPayload2 =
+ CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ std::filesystem::path Payload2Path{TempDir.Path() / "payload_2"};
+ WriteFile(Payload2Path, CompressedPayload2.GetCompressed());
+ IoBuffer Payload2 = IoBufferBuilder::MakeFromTemporaryFile(Payload2Path);
+ Payload2.SetDeleteOnClose(true);
+ {
+ CasStore::InsertResult Result =
+ FileCas.InsertChunk(Payload2, CompressedPayload2.DecodeRawHash(), CasStore::InsertMode::kCopyOnly);
+ CHECK_EQ(Result.New, true);
+ }
+
+ Payload2 = {};
+ CHECK(!std::filesystem::exists(Payload2Path));
+
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer ValidatePayload1 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload), RawHash, RawSize);
+ CHECK(ValidatePayload1);
+ }
+
+ IoBuffer FetchedPayload2 = FileCas.FindChunk(CompressedPayload1.DecodeRawHash());
+ CHECK(FetchedPayload2);
+ CHECK(FetchedPayload2.GetSize() == CompressedPayload2.GetCompressedSize());
+
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer ValidatePayload2 = CompressedBuffer::FromCompressed(SharedBuffer(FetchedPayload2), RawHash, RawSize);
+ CHECK(ValidatePayload2);
+ }
+ }
+}
+
+TEST_CASE("cas.chunk.recoverbadsize")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+
+ FileCasStrategy FileCas(Gc);
+ FileCas.Initialize(TempDir.Path() / "cas", /* IsNewStore */ true);
+
+ {
+ std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
+
+ IoBuffer PayloadBlob(1024 * 1024);
+ memset(PayloadBlob.GetMutableView().GetData(), 0, PayloadBlob.GetSize());
+ CompressedBuffer CompressedPayload1 = CompressedBuffer::Compress(SharedBuffer(PayloadBlob));
+ CompressedBuffer CompressedPayload2 =
+ CompressedBuffer::Compress(SharedBuffer(PayloadBlob), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+
+ CasStore::InsertResult Result =
+ FileCas.InsertChunk(CompressedPayload1.GetCompressed().Flatten().AsIoBuffer(), CompressedPayload1.DecodeRawHash());
+ CHECK_EQ(Result.New, true);
+
+ IoBuffer FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash());
+ CHECK(FetchedPayload);
+ CHECK(FetchedPayload.GetSize() == CompressedPayload1.GetCompressedSize());
+
+ IoBufferFileReference Ref{nullptr, 0, 0};
+ CHECK(FetchedPayload.GetFileReference(Ref));
+ std::error_code Ec;
+ std::filesystem::path CASPath = PathFromHandle(Ref.FileHandle, Ec);
+ CHECK(!Ec);
+ FetchedPayload = {};
+
+ WriteFile(CASPath, CompressedPayload2.GetCompressed());
+
+ FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash());
+ CHECK(FetchedPayload);
+ CHECK(FetchedPayload.GetSize() == CompressedPayload2.GetCompressedSize());
+ FetchedPayload = {};
+
+ WriteFile(CASPath, IoBuffer(1024));
+ CHECK(FileCas.HaveChunk(CompressedPayload1.DecodeRawHash()));
+ FetchedPayload = FileCas.FindChunk(CompressedPayload1.DecodeRawHash());
+ CHECK(!FetchedPayload);
+ CHECK(!FileCas.HaveChunk(CompressedPayload1.DecodeRawHash()));
+ }
+}
+
TEST_CASE("cas.file.move")
{
// specifying an absolute path here can be helpful when using procmon to dig into things
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index ff7126325..fb4b1888b 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -61,8 +61,6 @@ private:
};
using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;
- CasStore::InsertResult InsertChunkData(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
-
LoggerRef m_Log;
GcManager& m_Gc;
std::filesystem::path m_RootDirectory;
@@ -103,6 +101,8 @@ private:
ExtendablePathBuilder<128> ShardedPath;
};
+ bool UpdateIndex(const IoHash& ChunkHash, uint64_t ChunkSize);
+
friend class FileCasReferencePruner;
friend class FileCasStoreCompactor;
};