diff options
| author | Dan Engelbrecht <[email protected]> | 2023-10-03 13:31:02 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-03 13:31:02 +0200 |
| commit | 68a72b68592c416969bd36f413eb2b2762b9fcff (patch) | |
| tree | 9a5fc28eb9040f010c92f86a1745f9418dfc91ca /src | |
| parent | clean up date formatting (#440) (diff) | |
| download | zen-68a72b68592c416969bd36f413eb2b2762b9fcff.tar.xz zen-68a72b68592c416969bd36f413eb2b2762b9fcff.zip | |
faster accesstime save restore (#439)
- Improvement: Reduce time a cache bucket is locked for write when flushing/garbage collecting
- Change format for faster read/write and reduced size on disk
- Don't lock index while writing manifest to disk
- Skip garbage collect if we are currently in a Flush operation
- BlockStore::Flush no longer terminates currently writing block
- Garbage collect references to currently writing block but keep the block as new data may be added
- Fix BlockStore::Prune used disk space calculation
- Don't materialize data in filecas when we just need the size
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 300 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 22 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 62 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 66 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 84 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 145 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 23 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 1 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 2 |
9 files changed, 426 insertions, 279 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 98a24116f..9883e2119 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -238,37 +238,90 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo const auto _ = MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (CbFieldView Entry : Manifest["Timestamps"sv]) + uint64_t Count = Manifest["Count"sv].AsUInt64(0); + if (Count != 0) { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) + std::vector<size_t> KeysIndexes; + KeysIndexes.reserve(Count); + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) + { + if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) + { + KeysIndexes.push_back(It.value()); + continue; + } + KeysIndexes.push_back((uint64_t)-1); + } + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) + { + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + for (CbFieldView& RawHashView : RawHashArray) { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_Payloads[KeyIndex].RawHash = RawHashView.AsHash(); + } + KeyIndexOffset = 0; + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + for (CbFieldView& RawSizeView : RawSizeArray) + { + size_t KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex == (uint64_t)-1) + { + continue; + } + m_Payloads[KeyIndex].RawSize = RawSizeView.AsUInt64(); } } - for (CbFieldView Entry : Manifest["RawInfo"sv]) + + ////// Legacy format read { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) + for (CbFieldView Entry : Manifest["Timestamps"sv]) { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); - if (RawHash == IoHash::Zero || RawSize == 0) + if (auto It = m_Index.find(Key); It != m_Index.end()) { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); + m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); } + } + for (CbFieldView Entry : Manifest["RawInfo"sv]) + { + const CbObjectView Obj = Entry.AsObjectView(); + const IoHash Key = Obj["Key"sv].AsHash(); + if (auto It = m_Index.find(Key); It != m_Index.end()) + { + size_t EntryIndex = It.value(); + ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - m_Payloads[EntryIndex].RawHash = RawHash; - m_Payloads[EntryIndex].RawSize = RawSize; + const IoHash RawHash = Obj["RawHash"sv].AsHash(); + const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); + + if (RawHash == IoHash::Zero || RawSize == 0) + { + ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); + } + + m_Payloads[EntryIndex].RawHash = RawHash; + m_Payloads[EntryIndex].RawSize = RawSize; + } } } } @@ -578,14 +631,17 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex); if (BlockIt == BlockSizes.end()) { - ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + ZEN_WARN("Unknown block {} for entry {} in '{}'", BlockLocation.BlockIndex, Entry.first.ToHexString(), m_BucketDir); } else { uint64_t BlockSize = BlockIt->second; if (BlockLocation.Offset + BlockLocation.Size > BlockSize) { - ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + ZEN_WARN("Range is outside of block {} for entry {} in '{}'", + BlockLocation.BlockIndex, + Entry.first.ToHexString(), + m_BucketDir); } else { @@ -783,21 +839,50 @@ void ZenCacheDiskLayer::CacheBucket::Flush() { ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush"); + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - m_BlockStore.Flush(); - - RwLock::SharedLockScope _(m_IndexLock); + m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); - MakeIndexSnapshot(); - SaveManifest(); + + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + IndexMap Index; + + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + MakeIndexSnapshot(); + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); } void -ZenCacheDiskLayer::CacheBucket::SaveManifest() +ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest) +{ + ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); + try + { + SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Manifest); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); + } +} + +CbObject +ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, std::vector<AccessTime>&& AccessTimes, std::vector<BucketPayload>&& Payloads) { using namespace std::literals; - ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); CbObjectWriter Writer; Writer << "BucketId"sv << m_BucketId; @@ -805,46 +890,40 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest() if (!m_Index.empty()) { - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : m_Index) + Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) { - const IoHash& Key = Kv.first; - GcClock::Tick AccessTime = m_AccessTimes[Kv.second]; + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "LastAccess"sv << AccessTime; - Writer.EndObject(); + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); } Writer.EndArray(); - Writer.BeginArray("RawInfo"sv); + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) { - for (auto& Kv : m_Index) - { - const IoHash& Key = Kv.first; - const BucketPayload& Payload = m_Payloads[Kv.second]; - if (Payload.RawHash != IoHash::Zero) - { - Writer.BeginObject(); - Writer << "Key"sv << Key; - Writer << "RawHash"sv << Payload.RawHash; - Writer << "RawSize"sv << Payload.RawSize; - Writer.EndObject(); - } - } + const BucketPayload& Payload = Payloads[Kv.second]; + Writer.AddHash(Payload.RawHash); } Writer.EndArray(); - } - try - { - SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save()); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what()); + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const BucketPayload& Payload = Payloads[Kv.second]; + Writer.AddInteger(Payload.RawSize); + } + Writer.EndArray(); } + return Writer.Save(); } IoHash @@ -1200,7 +1279,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx) ExpiredKeys.reserve(1024); std::vector<IoHash> Cids; - Cids.reserve(1024); + if (!GcCtx.SkipCid()) + { + Cids.reserve(1024); + } for (const auto& Entry : Index) { @@ -1298,8 +1380,25 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) MovedCount, TotalChunkCount, NiceBytes(OldTotalSize)); - RwLock::SharedLockScope _(m_IndexLock); - SaveManifest(); + + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) + { + return; + } + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + IndexMap Index; + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + MakeIndexSnapshot(); + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); }); m_SlogFile.Flush(); @@ -1360,48 +1459,63 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) IndexMap Index; BlockStore::ReclaimSnapshotState BlockStoreState; { - ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); - - RwLock::SharedLockScope __(m_IndexLock); - Stopwatch Timer; - const auto ____ = MakeGuard([&] { - uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); - WriteBlockTimeUs += ElapsedUs; - WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); - }); - if (m_Index.empty()) + bool Expected = false; + if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true)) { - ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir); return; } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); - - SaveManifest(); - Index = m_Index; + auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); - for (const IoHash& Key : DeleteCacheKeys) + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; { - if (auto It = Index.find(Key); It != Index.end()) + ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State"); + RwLock::SharedLockScope IndexLock(m_IndexLock); + + Stopwatch Timer; + const auto ____ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + if (m_Index.empty()) { - const BucketPayload& Payload = m_Payloads[It->second]; - DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; - if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir); + return; + } + + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + Index = m_Index; + + for (const IoHash& Key : DeleteCacheKeys) + { + if (auto It = Index.find(Key); It != Index.end()) { - Entry.Location.Flags |= DiskLocation::kTombStone; - ExpiredStandaloneEntries.push_back(Entry); + const BucketPayload& Payload = m_Payloads[It->second]; + DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location}; + if (Entry.Location.Flags & DiskLocation::kStandaloneFile) + { + Entry.Location.Flags |= DiskLocation::kTombStone; + ExpiredStandaloneEntries.push_back(Entry); + } } } - } - if (GcCtx.IsDeletionMode()) - { - for (const auto& Entry : ExpiredStandaloneEntries) + if (GcCtx.IsDeletionMode()) { - m_Index.erase(Entry.Key); - m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); - DeletedChunks.insert(Entry.Key); + for (const auto& Entry : ExpiredStandaloneEntries) + { + m_Index.erase(Entry.Key); + m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed); + DeletedChunks.insert(Entry.Key); + } + m_SlogFile.Append(ExpiredStandaloneEntries); } - m_SlogFile.Append(ExpiredStandaloneEntries); } + SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads))); } if (GcCtx.IsDeletionMode()) diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 80c643afa..c4bedfee8 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -188,6 +188,7 @@ private: BlockStore m_BlockStore; Oid m_BucketId; uint64_t m_LargeObjectThreshold = 128 * 1024; + std::atomic_bool m_IsFlushing{}; // These files are used to manage storage of small objects for this bucket @@ -221,16 +222,17 @@ private: std::atomic_uint64_t m_TotalStandaloneSize{}; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; - void MakeIndexSnapshot(); - uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); - uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); - void OpenLog(const bool IsNew); - void SaveManifest(); + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); + uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); + void OpenLog(const bool IsNew); + CbObject MakeManifest(IndexMap&& Index, std::vector<AccessTime>&& AccessTimes, std::vector<BucketPayload>&& Payloads); + void SaveManifest(CbObject&& Manifest); CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const; // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 0a2947b16..1b6eeca3a 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -1030,46 +1030,50 @@ TEST_CASE("z$.gc") { ScopedTemporaryDirectory TempDir; GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); - const auto Bucket = "rightintwo"sv; - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) { - IoBuffer Value = testutils::CreateBinaryCacheValue(128); - Zcs.Put(Bucket, Key, {.Value = Value}); - } - - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2)); - GcCtx.CollectSmallObjects(true); + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); + const auto Bucket = "rightintwo"sv; - Gc.CollectGarbage(GcCtx); + std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; for (const auto& Key : Keys) { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); + IoBuffer Value = testutils::CreateBinaryCacheValue(128); + Zcs.Put(Bucket, Key, {.Value = Value}); } - } - // Move forward in time and collect again - { - GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2)); - GcCtx.CollectSmallObjects(true); + { + GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2)); + GcCtx.CollectSmallObjects(true); - Zcs.Flush(); - Gc.CollectGarbage(GcCtx); + Gc.CollectGarbage(GcCtx); - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(Exists); + } } + // Move forward in time and collect again + { + GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2)); + GcCtx.CollectSmallObjects(true); + + Zcs.Flush(); + Gc.CollectGarbage(GcCtx); + + for (const auto& Key : Keys) + { + ZenCacheValue CacheValue; + const bool Exists = Zcs.Get(Bucket, Key, CacheValue); + CHECK(!Exists); + } + } + } + { + ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache"); CHECK_EQ(0, Zcs.StorageSize().DiskSize); } } diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 4ddbdded7..4402e4486 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1239,15 +1239,37 @@ ProjectStore::Project::ReadAccessTimes() if (ValidationError == CbValidateError::None) { - CbObject Reader = LoadCompactBinaryObject(Obj); - CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView(); + CbObject Reader = LoadCompactBinaryObject(Obj); - for (CbFieldView& Entry : LastAccessTimes) + uint64_t Count = Reader["count"sv].AsUInt64(0); + if (Count > 0) { - CbObjectView AccessTime = Entry.AsObjectView(); - std::string_view Id = AccessTime["id"sv].AsString(); - GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64(); - m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick); + std::vector<uint64_t> Ticks; + Ticks.reserve(Count); + CbArrayView TicksArray = Reader["ticks"sv].AsArrayView(); + for (CbFieldView& TickView : TicksArray) + { + Ticks.emplace_back(TickView.AsUInt64()); + } + CbArrayView IdArray = Reader["ids"sv].AsArrayView(); + uint64_t Index = 0; + for (CbFieldView& IdView : IdArray) + { + std::string_view Id = IdView.AsString(); + m_LastAccessTimes.insert_or_assign(std::string(Id), Ticks[Index++]); + } + } + + ////// Legacy format read + { + CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView(); + for (CbFieldView& Entry : LastAccessTimes) + { + CbObjectView AccessTime = Entry.AsObjectView(); + std::string_view Id = AccessTime["id"sv].AsString(); + GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64(); + m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick); + } } } else @@ -1261,26 +1283,27 @@ ProjectStore::Project::WriteAccessTimes() { using namespace std::literals; - RwLock::ExclusiveLockScope _(m_ProjectLock); + CbObjectWriter Writer; - BinaryWriter Mem; + Writer.AddInteger("count", gsl::narrow<uint64_t>(m_LastAccessTimes.size())); + Writer.BeginArray("ids"); - CbObjectWriter Writer; - Writer.BeginArray("lastaccess"); { + RwLock::SharedLockScope _(m_ProjectLock); for (const auto& It : m_LastAccessTimes) { - Writer.BeginObject(); - { - Writer << "id"sv << It.first; - Writer << "tick"sv << gsl::narrow<uint64_t>(It.second); - } - Writer.EndObject(); + Writer << It.first; } + Writer.EndArray(); + Writer.BeginArray("ticks"); + for (const auto& It : m_LastAccessTimes) + { + Writer << gsl::narrow<uint64_t>(It.second); + } + Writer.EndArray(); } - Writer.EndArray(); - Writer.Save(Mem); + CbObject Data = Writer.Save(); try { @@ -1290,10 +1313,7 @@ ProjectStore::Project::WriteAccessTimes() ZEN_INFO("persisting access times for project '{}' to {}", Identifier, ProjectAccessTimesFilePath); - BasicFile Blob; - Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kTruncate); - Blob.Write(Mem.Data(), Mem.Size(), 0); - Blob.Flush(); + WriteFile(ProjectAccessTimesFilePath, Data.GetBuffer().AsIoBuffer()); } catch (std::exception& Err) { diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index b5ed17fc6..f99b0bc4a 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -233,7 +233,6 @@ BlockStore::Prune(const std::vector<BlockStoreLocation>& KnownLocations) if (!KnownBlocks.contains(BlockIndex)) { Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex]; - m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); BlocksToDelete.push_back(BlockIndex); } } @@ -242,6 +241,7 @@ BlockStore::Prune(const std::vector<BlockStoreLocation>& KnownLocations) { // Clear out unused blocks Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex]; + m_TotalSize.fetch_sub(BlockFile->FileSize(), std::memory_order::relaxed); m_ChunkBlocks.erase(BlockIndex); ZEN_DEBUG("marking block store file '{}' for delete, block #{}", BlockFile->GetPath(), BlockIndex); BlockFile->MarkAsDeleteOnClose(); @@ -354,22 +354,31 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const } void -BlockStore::Flush() +BlockStore::Flush(bool ForceNewBlock) { ZEN_TRACE_CPU("BlockStore::Flush"); - RwLock::ExclusiveLockScope _(m_InsertLock); - if (m_CurrentInsertOffset > 0) + if (ForceNewBlock) { - uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); - WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); - if (m_WriteBlock) + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) { - m_WriteBlock->Flush(); + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + if (m_WriteBlock) + { + m_WriteBlock->Flush(); + } + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; } - m_WriteBlock = nullptr; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); - m_CurrentInsertOffset = 0; + return; + } + RwLock::SharedLockScope _(m_InsertLock); + if (m_WriteBlock) + { + m_WriteBlock->Flush(); } } @@ -449,11 +458,6 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, { const BlockStoreLocation& Location = ChunkLocations[Index]; OldTotalSize += Location.Size; - if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex)) - { - continue; - } - auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); size_t ChunkMapIndex = 0; if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) @@ -524,9 +528,12 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, uint32_t NewBlockIndex = 0; for (uint32_t BlockIndex : BlocksToReWrite) { + bool IsActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex); + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; Ref<BlockStoreFile> OldBlockFile; + if (!IsActiveWriteBlock) { RwLock::SharedLockScope _i(m_InsertLock); Stopwatch Timer; @@ -553,6 +560,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, } ChangeCallback({}, DeleteMap); DeletedCount += DeleteMap.size(); + if (OldBlockFile) { RwLock::ExclusiveLockScope _i(m_InsertLock); Stopwatch Timer; @@ -561,18 +569,15 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - if (OldBlockFile) - { - ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); - ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); - m_ChunkBlocks.erase(BlockIndex); - m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); - OldBlockFile->MarkAsDeleteOnClose(); - } + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); + m_ChunkBlocks.erase(BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); + OldBlockFile->MarkAsDeleteOnClose(); } continue; } - else if (!OldBlockFile) + else if (!OldBlockFile && !IsActiveWriteBlock) { // If the block file pointed to does not exist, move any keep chunk them to deleted list ZEN_ERROR("Expected to find block {} in {} - this should never happen, marking {} entries as deleted.", @@ -585,6 +590,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, } MovedChunksArray MovedChunks; + if (OldBlockFile) { ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock"); std::vector<uint8_t> Chunk; @@ -689,9 +695,11 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, } ChangeCallback(MovedChunks, DeleteMap); - MovedCount += KeepMap.size(); + MovedCount += MovedChunks.size(); DeletedCount += DeleteMap.size(); MovedChunks.clear(); + + if (OldBlockFile) { RwLock::ExclusiveLockScope __(m_InsertLock); Stopwatch Timer; @@ -700,14 +708,12 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, ReadBlockTimeUs += ElapsedUs; ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); }); - if (OldBlockFile) - { - ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); - ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); - m_ChunkBlocks.erase(BlockIndex); - m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); - OldBlockFile->MarkAsDeleteOnClose(); - } + + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile); + m_ChunkBlocks.erase(BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); + OldBlockFile->MarkAsDeleteOnClose(); } } if (NewBlockFile) @@ -1117,7 +1123,7 @@ TEST_CASE("blockstore.clean.stray.blocks") CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); } -TEST_CASE("blockstore.flush.forces.new.block") +TEST_CASE("blockstore.flush.force.new.block") { using namespace blockstore::impl; @@ -1129,10 +1135,10 @@ TEST_CASE("blockstore.flush.forces.new.block") std::string FirstChunkData = "This is the data of the first chunk that we will write"; WriteStringAsChunk(Store, FirstChunkData, 4); - Store.Flush(); + Store.Flush(/*ForceNewBlock*/ true); std::string SecondChunkData = "This is the data for the second chunk that we will write"; WriteStringAsChunk(Store, SecondChunkData, 4); - Store.Flush(); + Store.Flush(/*ForceNewBlock*/ true); std::string ThirdChunkData = "This is a much longer string that will not fit in the first block so it should be placed in the second block"; WriteStringAsChunk(Store, ThirdChunkData, 4); @@ -1157,7 +1163,7 @@ TEST_CASE("blockstore.iterate.chunks") std::string SecondChunkData = "This is the data for the second chunk that we will write"; BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); - Store.Flush(); + Store.Flush(/*ForceNewBlock*/ false); std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); @@ -1267,7 +1273,7 @@ TEST_CASE("blockstore.reclaim.space") ChunksToKeep.push_back(ChunkIndex); } - Store.Flush(); + Store.Flush(/*ForceNewBlock*/ false); BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 1d1797597..ce2e53527 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -230,7 +230,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks) void CasContainerStrategy::Flush() { - m_BlockStore.Flush(); + m_BlockStore.Flush(/*ForceNewBlock*/ false); m_CasLog.Flush(); MakeIndexSnapshot(); } @@ -801,7 +801,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) auto BlockIt = BlockSizes.find(DiskLocation.GetBlockIndex()); if (BlockIt == BlockSizes.end()) { - ZEN_WARN("Unknown block {} for entry {}", DiskLocation.GetBlockIndex(), Entry.first.ToHexString()); + ZEN_WARN("Unknown block {} for entry {} in '{}'", DiskLocation.GetBlockIndex(), Entry.first.ToHexString(), BasePath); } else { @@ -810,7 +810,10 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) uint64_t BlockSize = BlockIt->second; if (BlockLocation.Offset + BlockLocation.Size > BlockSize) { - ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString()); + ZEN_WARN("Range is outside of block {} for entry {} in '{}'", + BlockLocation.BlockIndex, + Entry.first.ToHexString(), + BasePath); } else { @@ -1068,7 +1071,6 @@ TEST_CASE("compactcas.gc.removefile") TEST_CASE("compactcas.gc.compact") { - // for (uint32_t i = 0; i < 100; ++i) { ScopedTemporaryDirectory TempDir; @@ -1111,6 +1113,17 @@ TEST_CASE("compactcas.gc.compact") CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); + auto ValidateChunkExists = [&](size_t Index) { + IoBuffer Chunk = Cas.FindChunk(ChunkHashes[Index]); + bool Exists = !!Chunk; + CHECK(Exists); + IoHash Hash = IoHash::HashBuffer(Chunk); + if (ChunkHashes[Index] != Hash) + { + CHECK(fmt::format("{}", ChunkHashes[Index]) == fmt::format("{}", Hash)); + } + }; + // Keep first and last { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); @@ -1134,8 +1147,8 @@ TEST_CASE("compactcas.gc.compact") CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + ValidateChunkExists(0); + ValidateChunkExists(8); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); @@ -1167,7 +1180,7 @@ TEST_CASE("compactcas.gc.compact") CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + ValidateChunkExists(8); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); @@ -1201,9 +1214,9 @@ TEST_CASE("compactcas.gc.compact") CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(!Cas.HaveChunk(ChunkHashes[8])); - CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); + ValidateChunkExists(1); + ValidateChunkExists(4); + ValidateChunkExists(7); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[2], ChunkHashes[2]); @@ -1236,9 +1249,9 @@ TEST_CASE("compactcas.gc.compact") CHECK(Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + ValidateChunkExists(6); + ValidateChunkExists(7); + ValidateChunkExists(8); Cas.InsertChunk(Chunks[0], ChunkHashes[0]); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); @@ -1273,11 +1286,11 @@ TEST_CASE("compactcas.gc.compact") CHECK(!Cas.HaveChunk(ChunkHashes[7])); CHECK(Cas.HaveChunk(ChunkHashes[8])); - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + ValidateChunkExists(0); + ValidateChunkExists(2); + ValidateChunkExists(4); + ValidateChunkExists(6); + ValidateChunkExists(8); Cas.InsertChunk(Chunks[1], ChunkHashes[1]); Cas.InsertChunk(Chunks[3], ChunkHashes[3]); @@ -1286,15 +1299,15 @@ TEST_CASE("compactcas.gc.compact") } // Verify that we nicely appended blocks even after all GC operations - CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0]))); - CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1]))); - CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2]))); - CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3]))); - CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4]))); - CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5]))); - CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6]))); - CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7]))); - CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8]))); + ValidateChunkExists(0); + ValidateChunkExists(1); + ValidateChunkExists(2); + ValidateChunkExists(3); + ValidateChunkExists(4); + ValidateChunkExists(5); + ValidateChunkExists(6); + ValidateChunkExists(7); + ValidateChunkExists(8); } } @@ -1497,6 +1510,7 @@ TEST_CASE("compactcas.threadedinsert") IoBuffer Chunk = CreateRandomChunk(kChunkSize); IoHash Hash = HashBuffer(Chunk); NewChunks[Hash] = Chunk; + GcChunkHashes.insert(Hash); } std::atomic_uint32_t AddedChunkCount; @@ -1522,42 +1536,40 @@ TEST_CASE("compactcas.threadedinsert") }); } - while (AddedChunkCount.load() < NewChunks.size()) + std::unordered_set<IoHash, IoHash::Hasher> ChunksToDelete; + std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); + size_t C = 0; + while (C < KeepHashes.size()) { - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) + if (C % 155 == 0) { - if (Cas.HaveChunk(Chunk.first)) + if (C < KeepHashes.size() - 1) { - GcChunkHashes.emplace(Chunk.first); + ChunksToDelete.insert(KeepHashes[C]); + KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) + if (C + 3 < KeepHashes.size() - 1) { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } + ChunksToDelete.insert(KeepHashes[C + 3]); + KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; + KeepHashes.pop_back(); } - C++; } + C++; + } + while (AddedChunkCount.load() < NewChunks.size()) + { GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) { + CHECK(ChunksToDelete.contains(ChunkHash)); + GcChunkHashes.erase(ChunkHash); + }); } while (WorkCompleted < NewChunks.size() + Chunks.size()) @@ -1565,40 +1577,15 @@ TEST_CASE("compactcas.threadedinsert") Sleep(1); } - // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope - for (const auto& Chunk : NewChunks) - { - if (Cas.HaveChunk(Chunk.first)) - { - GcChunkHashes.emplace(Chunk.first); - } - } - std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end()); - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); GcCtx.CollectSmallObjects(true); GcCtx.AddRetainedCids(KeepHashes); Cas.CollectGarbage(GcCtx); const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); + Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) { + CHECK(ChunksToDelete.contains(ChunkHash)); + GcChunkHashes.erase(ChunkHash); + }); } { WorkCompleted = 0; diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 0d742d7e1..fe568a487 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -795,6 +795,21 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& } void +FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, uint64_t Size)>&& Callback) +{ + ZEN_TRACE_CPU("FileCas::IterateChunks"); + + ZEN_ASSERT(m_IsInitialized); + + RwLock::SharedLockScope _(m_Lock); + for (const auto& It : m_Index) + { + const IoHash& NameHash = It.first; + Callback(NameHash, It.second.Size); + } +} + +void FileCasStrategy::Flush() { ZEN_TRACE_CPU("FileCas::Flush"); @@ -927,7 +942,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("FileCas::CollectGarbage::Filter"); - IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) { + IterateChunks([&](const IoHash& Hash, uint64_t Size) { bool KeepThis = false; CandidateCas[0] = Hash; GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) { @@ -935,16 +950,14 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx) KeepThis = true; }); - const uint64_t FileSize = Payload.GetSize(); - if (!KeepThis) { ChunksToDelete.push_back(Hash); - ChunksToDeleteBytes.fetch_add(FileSize); + ChunksToDeleteBytes.fetch_add(Size); } ++ChunkCount; - ChunkBytes.fetch_add(FileSize); + ChunkBytes.fetch_add(Size); }); } diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index d9186f05b..10c181c0b 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -86,6 +86,7 @@ private: inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback); + void IterateChunks(std::function<void(const IoHash& Hash, uint64_t PayloadSize)>&& Callback); void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); struct ShardingHelper diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index edd6df5a2..6fc0652f2 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -139,7 +139,7 @@ public: void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback); IoBuffer TryGetChunk(const BlockStoreLocation& Location) const; - void Flush(); + void Flush(bool ForceNewBlock); ReclaimSnapshotState GetReclaimSnapshotState(); void ReclaimSpace( |