diff options
| author | Dan Engelbrecht <[email protected]> | 2022-11-24 13:20:59 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-11-24 04:20:59 -0800 |
| commit | 666a543ed82896c972526ef08476a41ccbfbd2c4 (patch) | |
| tree | 49a52941d9ced665431ebf320d0f7d0f4b6e5cfa /zenstore/filecas.cpp | |
| parent | Don't resize block store block file to max size at creation (#193) (diff) | |
| download | zen-666a543ed82896c972526ef08476a41ccbfbd2c4.tar.xz zen-666a543ed82896c972526ef08476a41ccbfbd2c4.zip | |
Fix disk usage stats (#194)
* Improve tracking of used disk space for filecas and compactcas
Add tracking of used disk space for project store
Remove ZenCacheStore as GcStorage/GcContributor
- underlying ZenCacheNamespace instances register themselves directly
- removing this also fixes double reporting of GcStorageSize for namespaces
* changelog
Diffstat (limited to 'zenstore/filecas.cpp')
| -rw-r--r-- | zenstore/filecas.cpp | 287 |
1 files changed, 214 insertions, 73 deletions
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 9825f225a..1b53c405b 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -97,32 +97,48 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); }); - std::unordered_set<IoHash> FoundEntries; - FoundEntries.reserve(10000); + m_KnownEntries.reserve(10000); m_CasLog.Replay( [&](const FileCasIndexEntry& Entry) { if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) { - if (!FoundEntries.contains(Entry.Key)) + if (m_KnownEntries.erase(Entry.Key) == 1u) { - return; + m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); } - m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); - FoundEntries.erase(Entry.Key); } else { - if (FoundEntries.contains(Entry.Key)) + if (m_KnownEntries.insert(Entry.Key).second) { - return; + m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); } - FoundEntries.insert(Entry.Key); - m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); } }, 0); } +#if ZEN_PLATFORM_WINDOWS +static void +DeletePayloadFileOnClose(const void* FileHandle) +{ + const HANDLE WinFileHandle = (const HANDLE)FileHandle; + // This will cause the file to be deleted when the last handle to it is closed + FILE_DISPOSITION_INFO Fdi{}; + Fdi.DeleteFile = TRUE; + BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + + if (!Success) + { + // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed + // to delete it. + ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", + PathFromHandle(WinFileHandle), + GetLastErrorAsString()); + } +} +#endif + CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { @@ -134,10 +150,12 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if (Mode == CasStore::InsertMode::kCopyOnly) { - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - if (std::filesystem::is_regular_file(Name.ShardedPath.ToPath())) { - return {.New = false}; + RwLock::SharedLockScope _(m_Lock); + if (m_KnownEntries.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } } return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } @@ -148,33 +166,40 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { + { + bool Exists = true; + { + RwLock::SharedLockScope _(m_Lock); + Exists = m_KnownEntries.contains(ChunkHash); + } + if (Exists) + { +#if ZEN_PLATFORM_WINDOWS + DeletePayloadFileOnClose(FileRef.FileHandle); +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); + if (unlink(FilePath.c_str()) < 0) + { + int UnlinkError = zen::GetLastError(); + if (UnlinkError != ENOENT) + { + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + FilePath.string(), + GetSystemErrorAsString(UnlinkError)); + } + } +#endif + return CasStore::InsertResult{.New = false}; + } + } + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - RwLock::ExclusiveLockScope _(LockForHash(ChunkHash)); + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); #if ZEN_PLATFORM_WINDOWS const HANDLE ChunkFileHandle = FileRef.FileHandle; - - auto DeletePayloadFileOnClose = [&] { - // This will cause the file to be deleted when the last handle to it is closed - FILE_DISPOSITION_INFO Fdi{}; - Fdi.DeleteFile = TRUE; - BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); - - if (!Success) - { - // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed - // to delete it. - ZEN_WARN("Failed to flag temporary payload file '{}' for deletion: '{}'", - PathFromHandle(ChunkFileHandle), - GetLastErrorAsString()); - } - }; - // See if file already exists - // - // Future improvement: maintain Bloom filter to avoid expensive file system probes? - { CAtlFile PayloadFile; @@ -184,20 +209,33 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: // and should contain the content we were about to insert // We do need to ensure the source file goes away on close, however - - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes)) + size_t ChunkSize = Chunk.GetSize(); + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) { - m_TotalSize.fetch_add(static_cast<int64_t>(FileSize)); + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(Chunk.Size()), std::memory_order::relaxed); + } + + DeletePayloadFileOnClose(ChunkFileHandle); + + return CasStore::InsertResult{.New = IsNew}; } else { - ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); } - - DeletePayloadFileOnClose(); - - return CasStore::InsertResult{.New = false}; } else { @@ -240,7 +278,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); // Try to move file into place - BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); if (!Success) @@ -286,31 +323,53 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if (Success) { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); - return CasStore::InsertResult{.New = true}; + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + + return CasStore::InsertResult{.New = IsNew}; } const DWORD LastError = GetLastError(); if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) { - DeletePayloadFileOnClose(); + HashLock.ReleaseNow(); + DeletePayloadFileOnClose(ChunkFileHandle); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } - return CasStore::InsertResult{.New = false}; + return CasStore::InsertResult{.New = IsNew}; } ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", GetSystemErrorAsString(LastError), ChunkHash); - DeletePayloadFileOnClose(); + DeletePayloadFileOnClose(ChunkFileHandle); #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); - std::filesystem::path DestPath = Name.ShardedPath.c_str(); - int Ret = link(SourcePath.c_str(), DestPath.c_str()); + std::filesystem::path DestPath = Name.ShardedPath.c_str(); + int Ret = link(SourcePath.c_str(), DestPath.c_str()); if (Ret < 0 && zen::GetLastError() == ENOENT) { // Destination directory doesn't exist. Create it any try again. @@ -319,14 +378,14 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } int LinkError = zen::GetLastError(); - // Unlink the file. If the path to unlink didn't exist someone else - // beat us to it and that is hunky-dory. if (unlink(SourcePath.c_str()) < 0) { int UnlinkError = zen::GetLastError(); if (UnlinkError != ENOENT) { - ZEN_WARN("unlink of CAS payload file failed ('{}')", GetSystemErrorAsString(UnlinkError)); + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + SourcePath.string(), + GetSystemErrorAsString(UnlinkError)); } } @@ -336,7 +395,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: { if (LinkError == EEXIST) { - return CasStore::InsertResult{.New = false}; + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; } ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", @@ -345,7 +414,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } else { - return CasStore::InsertResult{.New = true}; + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; } #endif // ZEN_PLATFORM_* } @@ -361,8 +440,6 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); // See if file already exists - // - // Future improvement: maintain Bloom filter to avoid expensive file system probes? #if ZEN_PLATFORM_WINDOWS CAtlFile PayloadFile; @@ -374,8 +451,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the // content we were about to insert - m_TotalSize.fetch_add(static_cast<int64_t>(ChunkSize)); - return CasStore::InsertResult{.New = false}; + bool IsNew = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; } PayloadFile.Close(); @@ -386,7 +471,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize } #endif - RwLock::ExclusiveLockScope _(LockForHash(ChunkHash)); + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); #if ZEN_PLATFORM_WINDOWS // For now, use double-checked locking to see if someone else was first @@ -395,11 +480,31 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize if (SUCCEEDED(hRes)) { - // If we succeeded in opening the file then we don't need to do anything - // else because someone else managed to create the file before we did. Just return. + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) + { + // If we succeeded in opening the file then and the size is correct we don't need to do anything + // else because someone else managed to create the file before we did. Just return. - m_TotalSize.fetch_add(static_cast<int64_t>(ChunkSize)); - return {.New = false}; + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + else + { + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); + } } if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) @@ -441,7 +546,21 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { case EEXIST: // Another thread has beat us to it so we're golden. - return {.New = false}; + { + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return {.New = IsNew}; + } + break; case ENOENT: if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) @@ -455,7 +574,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); default: - ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath)); + ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); } } @@ -492,10 +611,21 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // *after* the lock is released due to the initialization order PayloadFile.Close(); - m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed); m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); - return {.New = true}; + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + + return {.New = IsNew}; } IoBuffer @@ -546,7 +676,10 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) if (!Ec) { - m_TotalSize.fetch_sub(FileSize); + if (m_KnownEntries.erase(ChunkHash) == 1u) + { + m_TotalSize.fetch_sub(FileSize, std::memory_order_relaxed); + } m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); } } @@ -721,6 +854,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; std::vector<IoHash> CandidateCas; + CandidateCas.resize(1); uint64_t DeletedCount = 0; uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); @@ -737,15 +871,22 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) }); IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { - bool KeepThis = false; - CandidateCas.clear(); - CandidateCas.push_back(Hash); + bool KeepThis = false; + CandidateCas[0] = Hash; GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { ZEN_UNUSED(Hash); KeepThis = true; }); const uint64_t FileSize = Payload.FileSize(); + // Is this a file we did not track previously? + { + RwLock::ExclusiveLockScope _(m_Lock); + if (m_KnownEntries.insert(Hash).second) + { + m_TotalSize.fetch_add(FileSize, std::memory_order_relaxed); + } + } if (!KeepThis) { |