diff options
| author | Dan Engelbrecht <[email protected]> | 2022-11-24 13:20:59 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-11-24 04:20:59 -0800 |
| commit | 666a543ed82896c972526ef08476a41ccbfbd2c4 (patch) | |
| tree | 49a52941d9ced665431ebf320d0f7d0f4b6e5cfa /zenstore | |
| parent | Don't resize block store block file to max size at creation (#193) (diff) | |
| download | zen-666a543ed82896c972526ef08476a41ccbfbd2c4.tar.xz zen-666a543ed82896c972526ef08476a41ccbfbd2c4.zip | |
Fix disk usage stats (#194)
* Improve tracking of used disk space for filecas and compactcas
Add tracking of used disk space for project store
Remove ZenCacheStore as GcStorage/GcContributor
- underlying ZenCacheNamespace instances register themselves directly
- removing this also fixes double reporting of GcStorageSize for namespaces
* changelog
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/blockstore.cpp | 25 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 29 | ||||
| -rw-r--r-- | zenstore/compactcas.h | 4 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 287 | ||||
| -rw-r--r-- | zenstore/filecas.h | 18 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 4 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 5 |
7 files changed, 255 insertions, 117 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index 682cc4472..c8bf482fa 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -95,12 +95,6 @@ BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) } void -BlockStoreFile::Truncate(uint64_t Size) -{ - m_File.SetFileSize(Size); -} - -void BlockStoreFile::Flush() { m_File.Flush(); @@ -130,6 +124,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, ZEN_ASSERT(MaxBlockCount > 0); ZEN_ASSERT(IsPow2(MaxBlockCount)); + m_TotalSize = 0; m_BlocksBasePath = BlocksBasePath; m_MaxBlockSize = MaxBlockSize; @@ -184,6 +179,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, } Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path); BlockFile->Open(); + m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); m_ChunkBlocks[BlockIndex] = BlockFile; } } @@ -248,13 +244,15 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, cons m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; } - uint64_t InsertOffset = m_CurrentInsertOffset; - m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); - Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + uint64_t InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); + uint64_t AlignedWriteSize = m_CurrentInsertOffset - InsertOffset; + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; m_ActiveWriteBlocks.push_back(WriteBlockIndex); InsertLock.ReleaseNow(); WriteBlock->Write(Data, Size, InsertOffset); + m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); @@ -477,6 +475,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, }); m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } continue; @@ -496,7 +495,6 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, if (NewBlockFile) { - NewBlockFile->Truncate(WriteOffset); NewBlockFile->Flush(); NewBlockFile = nullptr; } @@ -566,12 +564,13 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); - WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + uint64_t OldOffset = WriteOffset; + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed); } Chunk.clear(); if (NewBlockFile) { - NewBlockFile->Truncate(WriteOffset); NewBlockFile->Flush(); NewBlockFile = nullptr; } @@ -596,6 +595,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, }); m_ChunkBlocks[BlockIndex] = nullptr; ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); OldBlockFile->MarkAsDeleteOnClose(); } } @@ -606,6 +606,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, if (NewBlockFile) { ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); + m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed); NewBlockFile->MarkAsDeleteOnClose(); } } diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 519478356..d0c2f59ac 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -176,7 +176,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const m_LocationMap.emplace(ChunkHash, DiskLocation); } }); - m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); return CasStore::InsertResult{.New = true}; } @@ -484,8 +483,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx) if (Entry.Flags & CasDiskIndexEntry::kTombstone) { m_LocationMap.erase(Entry.Key); - uint64_t ChunkSize = Entry.Location.GetSize(); - m_TotalSize.fetch_sub(ChunkSize); continue; } m_LocationMap[Entry.Key] = Entry.Location; @@ -686,7 +683,6 @@ void CasContainerStrategy::OpenContainer(bool IsNewStore) { // Add .running file and delete on clean on close to detect bad termination - m_TotalSize = 0; m_LocationMap.clear(); @@ -710,7 +706,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& Location = Entry.second; - m_TotalSize.fetch_add(Location.GetSize(), std::memory_order::relaxed); KnownLocations.push_back(Location.Get(m_PayloadAlignment)); } @@ -991,8 +986,6 @@ TEST_CASE("compactcas.gc.compact") CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); - uint64_t InitialSize = Cas.StorageSize().DiskSize; - // Keep first and last { GcContext GcCtx; @@ -1018,15 +1011,15 @@ TEST_CASE("compactcas.gc.compact") CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - } - Cas.InsertChunk(Chunks[1], ChunkHashes[1]); - Cas.InsertChunk(Chunks[2], ChunkHashes[2]); - Cas.InsertChunk(Chunks[3], ChunkHashes[3]); - Cas.InsertChunk(Chunks[4], ChunkHashes[4]); - Cas.InsertChunk(Chunks[5], ChunkHashes[5]); - Cas.InsertChunk(Chunks[6], ChunkHashes[6]); - Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + Cas.InsertChunk(Chunks[1], ChunkHashes[1]); + Cas.InsertChunk(Chunks[2], ChunkHashes[2]); + Cas.InsertChunk(Chunks[3], ChunkHashes[3]); + Cas.InsertChunk(Chunks[4], ChunkHashes[4]); + Cas.InsertChunk(Chunks[5], ChunkHashes[5]); + Cas.InsertChunk(Chunks[6], ChunkHashes[6]); + Cas.InsertChunk(Chunks[7], ChunkHashes[7]); + } // Keep last { @@ -1177,9 +1170,6 @@ TEST_CASE("compactcas.gc.compact") CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); - - uint64_t FinalSize = Cas.StorageSize().DiskSize; - CHECK(InitialSize == FinalSize); } } @@ -1346,7 +1336,8 @@ TEST_CASE("compactcas.threadedinsert") WorkCompleted = 0; const uint64_t TotalSize = Cas.StorageSize().DiskSize; - CHECK_EQ(ExpectedSize, TotalSize); + CHECK_LE(ExpectedSize, TotalSize); + CHECK_GE(ExpectedSize + 32768, TotalSize); { for (const auto& Chunk : Chunks) diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 2acac7ca3..3d9c42c1b 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -63,7 +63,7 @@ struct CasContainerStrategy final : public GcStorage void Flush(); void Scrub(ScrubContext& Ctx); virtual void CollectGarbage(GcContext& GcCtx) override; - virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::acquire)}; } + virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_BlockStore.TotalSize()}; } private: CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); @@ -87,8 +87,6 @@ private: RwLock m_LocationMapLock; typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t; LocationMap_t m_LocationMap; - - std::atomic_uint64_t m_TotalSize{}; }; void compactcas_forcelink(); diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 9825f225a..1b53c405b 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -97,32 +97,48 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed))); }); - std::unordered_set<IoHash> FoundEntries; - FoundEntries.reserve(10000); + m_KnownEntries.reserve(10000); m_CasLog.Replay( [&](const FileCasIndexEntry& Entry) { if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone)) { - if (!FoundEntries.contains(Entry.Key)) + if (m_KnownEntries.erase(Entry.Key) == 1u) { - return; + m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); } - m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed); - FoundEntries.erase(Entry.Key); } else { - if (FoundEntries.contains(Entry.Key)) + if (m_KnownEntries.insert(Entry.Key).second) { - return; + m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); } - FoundEntries.insert(Entry.Key); - m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed); } }, 0); } +#if ZEN_PLATFORM_WINDOWS +static void +DeletePayloadFileOnClose(const void* FileHandle) +{ + const HANDLE WinFileHandle = (const HANDLE)FileHandle; + // This will cause the file to be deleted when the last handle to it is closed + FILE_DISPOSITION_INFO Fdi{}; + Fdi.DeleteFile = TRUE; + BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + + if (!Success) + { + // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed + // to delete it. + ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", + PathFromHandle(WinFileHandle), + GetLastErrorAsString()); + } +} +#endif + CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { @@ -134,10 +150,12 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if (Mode == CasStore::InsertMode::kCopyOnly) { - ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - if (std::filesystem::is_regular_file(Name.ShardedPath.ToPath())) { - return {.New = false}; + RwLock::SharedLockScope _(m_Lock); + if (m_KnownEntries.contains(ChunkHash)) + { + return CasStore::InsertResult{.New = false}; + } } return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); } @@ -148,33 +166,40 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { + { + bool Exists = true; + { + RwLock::SharedLockScope _(m_Lock); + Exists = m_KnownEntries.contains(ChunkHash); + } + if (Exists) + { +#if ZEN_PLATFORM_WINDOWS + DeletePayloadFileOnClose(FileRef.FileHandle); +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); + if (unlink(FilePath.c_str()) < 0) + { + int UnlinkError = zen::GetLastError(); + if (UnlinkError != ENOENT) + { + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + FilePath.string(), + GetSystemErrorAsString(UnlinkError)); + } + } +#endif + return CasStore::InsertResult{.New = false}; + } + } + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); - RwLock::ExclusiveLockScope _(LockForHash(ChunkHash)); + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); #if ZEN_PLATFORM_WINDOWS const HANDLE ChunkFileHandle = FileRef.FileHandle; - - auto DeletePayloadFileOnClose = [&] { - // This will cause the file to be deleted when the last handle to it is closed - FILE_DISPOSITION_INFO Fdi{}; - Fdi.DeleteFile = TRUE; - BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); - - if (!Success) - { - // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed - // to delete it. - ZEN_WARN("Failed to flag temporary payload file '{}' for deletion: '{}'", - PathFromHandle(ChunkFileHandle), - GetLastErrorAsString()); - } - }; - // See if file already exists - // - // Future improvement: maintain Bloom filter to avoid expensive file system probes? - { CAtlFile PayloadFile; @@ -184,20 +209,33 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: // and should contain the content we were about to insert // We do need to ensure the source file goes away on close, however - - uint64_t FileSize = 0; - if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes)) + size_t ChunkSize = Chunk.GetSize(); + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) { - m_TotalSize.fetch_add(static_cast<int64_t>(FileSize)); + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(Chunk.Size()), std::memory_order::relaxed); + } + + DeletePayloadFileOnClose(ChunkFileHandle); + + return CasStore::InsertResult{.New = IsNew}; } else { - ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8()); + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); } - - DeletePayloadFileOnClose(); - - return CasStore::InsertResult{.New = false}; } else { @@ -240,7 +278,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); // Try to move file into place - BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); if (!Success) @@ -286,31 +323,53 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if (Success) { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); - return CasStore::InsertResult{.New = true}; + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + + return CasStore::InsertResult{.New = IsNew}; } const DWORD LastError = GetLastError(); if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) { - DeletePayloadFileOnClose(); + HashLock.ReleaseNow(); + DeletePayloadFileOnClose(ChunkFileHandle); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } - return CasStore::InsertResult{.New = false}; + return CasStore::InsertResult{.New = IsNew}; } ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", GetSystemErrorAsString(LastError), ChunkHash); - DeletePayloadFileOnClose(); + DeletePayloadFileOnClose(ChunkFileHandle); #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); - std::filesystem::path DestPath = Name.ShardedPath.c_str(); - int Ret = link(SourcePath.c_str(), DestPath.c_str()); + std::filesystem::path DestPath = Name.ShardedPath.c_str(); + int Ret = link(SourcePath.c_str(), DestPath.c_str()); if (Ret < 0 && zen::GetLastError() == ENOENT) { // Destination directory doesn't exist. Create it any try again. @@ -319,14 +378,14 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } int LinkError = zen::GetLastError(); - // Unlink the file. If the path to unlink didn't exist someone else - // beat us to it and that is hunky-dory. if (unlink(SourcePath.c_str()) < 0) { int UnlinkError = zen::GetLastError(); if (UnlinkError != ENOENT) { - ZEN_WARN("unlink of CAS payload file failed ('{}')", GetSystemErrorAsString(UnlinkError)); + ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", + SourcePath.string(), + GetSystemErrorAsString(UnlinkError)); } } @@ -336,7 +395,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: { if (LinkError == EEXIST) { - return CasStore::InsertResult{.New = false}; + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; } ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}", @@ -345,7 +414,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } else { - return CasStore::InsertResult{.New = true}; + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; } #endif // ZEN_PLATFORM_* } @@ -361,8 +440,6 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); // See if file already exists - // - // Future improvement: maintain Bloom filter to avoid expensive file system probes? #if ZEN_PLATFORM_WINDOWS CAtlFile PayloadFile; @@ -374,8 +451,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the // content we were about to insert - m_TotalSize.fetch_add(static_cast<int64_t>(ChunkSize)); - return CasStore::InsertResult{.New = false}; + bool IsNew = false; + { + RwLock::ExclusiveLockScope _(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; } PayloadFile.Close(); @@ -386,7 +471,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize } #endif - RwLock::ExclusiveLockScope _(LockForHash(ChunkHash)); + RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash)); #if ZEN_PLATFORM_WINDOWS // For now, use double-checked locking to see if someone else was first @@ -395,11 +480,31 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize if (SUCCEEDED(hRes)) { - // If we succeeded in opening the file then we don't need to do anything - // else because someone else managed to create the file before we did. Just return. + uint64_t FileSize = 0; + if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize) + { + // If we succeeded in opening the file then and the size is correct we don't need to do anything + // else because someone else managed to create the file before we did. Just return. - m_TotalSize.fetch_add(static_cast<int64_t>(ChunkSize)); - return {.New = false}; + HashLock.ReleaseNow(); + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return CasStore::InsertResult{.New = IsNew}; + } + else + { + ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite", + Name.ShardedPath.ToUtf8(), + ChunkSize, + FileSize); + } } if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))) @@ -441,7 +546,21 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { case EEXIST: // Another thread has beat us to it so we're golden. - return {.New = false}; + { + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + return {.New = IsNew}; + } + break; case ENOENT: if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len))) @@ -455,7 +574,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath)); default: - ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath)); + ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8())); } } @@ -492,10 +611,21 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // *after* the lock is released due to the initialization order PayloadFile.Close(); - m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed); m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize}); - return {.New = true}; + HashLock.ReleaseNow(); + + bool IsNew = false; + { + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_KnownEntries.insert(ChunkHash).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); + } + + return {.New = IsNew}; } IoBuffer @@ -546,7 +676,10 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) if (!Ec) { - m_TotalSize.fetch_sub(FileSize); + if (m_KnownEntries.erase(ChunkHash) == 1u) + { + m_TotalSize.fetch_sub(FileSize, std::memory_order_relaxed); + } m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize}); } } @@ -721,6 +854,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; std::vector<IoHash> CandidateCas; + CandidateCas.resize(1); uint64_t DeletedCount = 0; uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed); @@ -737,15 +871,22 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) }); IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { - bool KeepThis = false; - CandidateCas.clear(); - CandidateCas.push_back(Hash); + bool KeepThis = false; + CandidateCas[0] = Hash; GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { ZEN_UNUSED(Hash); KeepThis = true; }); const uint64_t FileSize = Payload.FileSize(); + // Is this a file we did not track previously? + { + RwLock::ExclusiveLockScope _(m_Lock); + if (m_KnownEntries.insert(Hash).second) + { + m_TotalSize.fetch_add(FileSize, std::memory_order_relaxed); + } + } if (!KeepThis) { diff --git a/zenstore/filecas.h b/zenstore/filecas.h index de79b8b81..29c28560e 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -33,7 +33,6 @@ struct FileCasStrategy final : public GcStorage ~FileCasStrategy(); void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore); - CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace); @@ -46,13 +45,16 @@ struct FileCasStrategy final : public GcStorage virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; } private: - std::filesystem::path m_RootDirectory; - RwLock m_Lock; - RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines - spdlog::logger& m_Log; - spdlog::logger& Log() { return m_Log; } - std::atomic_uint64_t m_TotalSize{}; - bool m_IsInitialized = false; + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); + + std::filesystem::path m_RootDirectory; + RwLock m_Lock; + std::unordered_set<IoHash, IoHash::Hasher> m_KnownEntries; + RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + std::atomic_uint64_t m_TotalSize{}; + bool m_IsInitialized = false; struct FileCasIndexEntry { diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 0902abf4a..8aac65bb4 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -908,7 +908,9 @@ TEST_CASE("gc.full") CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8]))); auto FinalSize = CasStore->TotalSize(); - CHECK(InitialSize.TinySize == FinalSize.TinySize); + + CHECK_LE(InitialSize.TinySize, FinalSize.TinySize); + CHECK_GE(InitialSize.TinySize + (1u << 28), FinalSize.TinySize); } #endif diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h index 9f16063a0..5ef2d4694 100644 --- a/zenstore/include/zenstore/blockstore.h +++ b/zenstore/include/zenstore/blockstore.h @@ -96,7 +96,6 @@ struct BlockStoreFile : public RefCounted IoBuffer GetChunk(uint64_t Offset, uint64_t Size); void Read(void* Data, uint64_t Size, uint64_t FileOffset); void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Truncate(uint64_t Size); void Flush(); BasicFile& GetBasicFile(); void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); @@ -153,6 +152,8 @@ public: static const char* GetBlockFileExtension(); static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); + inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } + private: std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks; @@ -165,6 +166,8 @@ private: uint64_t m_MaxBlockSize = 1u << 28; uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1; std::filesystem::path m_BlocksBasePath; + + std::atomic_uint64_t m_TotalSize{}; }; void blockstore_forcelink(); |