aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-10-03 13:31:02 +0200
committerGitHub <[email protected]>2023-10-03 13:31:02 +0200
commit68a72b68592c416969bd36f413eb2b2762b9fcff (patch)
tree9a5fc28eb9040f010c92f86a1745f9418dfc91ca /src
parentclean up date formatting (#440) (diff)
downloadzen-68a72b68592c416969bd36f413eb2b2762b9fcff.tar.xz
zen-68a72b68592c416969bd36f413eb2b2762b9fcff.zip
faster accesstime save restore (#439)
- Improvement: Reduce time a cache bucket is locked for write when flushing/garbage collecting - Change format for faster read/write and reduced size on disk - Don't lock index while writing manifest to disk - Skip garbage collect if we are currently in a Flush operation - BlockStore::Flush no longer terminates currently writing block - Garbage collect references to currently writing block but keep the block as new data may be added - Fix BlockStore::Prune used disk space calculation - Don't materialize data in filecas when we just need the size
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp300
-rw-r--r--src/zenserver/cache/cachedisklayer.h22
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp62
-rw-r--r--src/zenserver/projectstore/projectstore.cpp66
-rw-r--r--src/zenstore/blockstore.cpp84
-rw-r--r--src/zenstore/compactcas.cpp145
-rw-r--r--src/zenstore/filecas.cpp23
-rw-r--r--src/zenstore/filecas.h1
-rw-r--r--src/zenstore/include/zenstore/blockstore.h2
9 files changed, 426 insertions, 279 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index 98a24116f..9883e2119 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -238,37 +238,90 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
const auto _ =
MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- for (CbFieldView Entry : Manifest["Timestamps"sv])
+ uint64_t Count = Manifest["Count"sv].AsUInt64(0);
+ if (Count != 0)
{
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
-
- if (auto It = m_Index.find(Key); It != m_Index.end())
+ std::vector<size_t> KeysIndexes;
+ KeysIndexes.reserve(Count);
+ CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView();
+ for (CbFieldView& KeyView : KeyArray)
+ {
+ if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end())
+ {
+ KeysIndexes.push_back(It.value());
+ continue;
+ }
+ KeysIndexes.push_back((uint64_t)-1);
+ }
+ size_t KeyIndexOffset = 0;
+ CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView();
+ for (CbFieldView& TimeStampView : TimeStampArray)
+ {
+ size_t KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex == (uint64_t)-1)
+ {
+ continue;
+ }
+ m_AccessTimes[KeyIndex] = TimeStampView.AsInt64();
+ }
+ KeyIndexOffset = 0;
+ CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView();
+ for (CbFieldView& RawHashView : RawHashArray)
{
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
- m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
+ size_t KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex == (uint64_t)-1)
+ {
+ continue;
+ }
+ m_Payloads[KeyIndex].RawHash = RawHashView.AsHash();
+ }
+ KeyIndexOffset = 0;
+ CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView();
+ for (CbFieldView& RawSizeView : RawSizeArray)
+ {
+ size_t KeyIndex = KeysIndexes[KeyIndexOffset++];
+ if (KeyIndex == (uint64_t)-1)
+ {
+ continue;
+ }
+ m_Payloads[KeyIndex].RawSize = RawSizeView.AsUInt64();
}
}
- for (CbFieldView Entry : Manifest["RawInfo"sv])
+
+ ////// Legacy format read
{
- const CbObjectView Obj = Entry.AsObjectView();
- const IoHash Key = Obj["Key"sv].AsHash();
- if (auto It = m_Index.find(Key); It != m_Index.end())
+ for (CbFieldView Entry : Manifest["Timestamps"sv])
{
- size_t EntryIndex = It.value();
- ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
-
- const IoHash RawHash = Obj["RawHash"sv].AsHash();
- const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
+ const CbObjectView Obj = Entry.AsObjectView();
+ const IoHash Key = Obj["Key"sv].AsHash();
- if (RawHash == IoHash::Zero || RawSize == 0)
+ if (auto It = m_Index.find(Key); It != m_Index.end())
{
- ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size());
+ m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64();
}
+ }
+ for (CbFieldView Entry : Manifest["RawInfo"sv])
+ {
+ const CbObjectView Obj = Entry.AsObjectView();
+ const IoHash Key = Obj["Key"sv].AsHash();
+ if (auto It = m_Index.find(Key); It != m_Index.end())
+ {
+ size_t EntryIndex = It.value();
+ ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size());
- m_Payloads[EntryIndex].RawHash = RawHash;
- m_Payloads[EntryIndex].RawSize = RawSize;
+ const IoHash RawHash = Obj["RawHash"sv].AsHash();
+ const uint64_t RawSize = Obj["RawSize"sv].AsUInt64();
+
+ if (RawHash == IoHash::Zero || RawSize == 0)
+ {
+ ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex);
+ }
+
+ m_Payloads[EntryIndex].RawHash = RawHash;
+ m_Payloads[EntryIndex].RawSize = RawSize;
+ }
}
}
}
@@ -578,14 +631,17 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew)
auto BlockIt = BlockSizes.find(BlockLocation.BlockIndex);
if (BlockIt == BlockSizes.end())
{
- ZEN_WARN("Unknown block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
+ ZEN_WARN("Unknown block {} for entry {} in '{}'", BlockLocation.BlockIndex, Entry.first.ToHexString(), m_BucketDir);
}
else
{
uint64_t BlockSize = BlockIt->second;
if (BlockLocation.Offset + BlockLocation.Size > BlockSize)
{
- ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
+ ZEN_WARN("Range is outside of block {} for entry {} in '{}'",
+ BlockLocation.BlockIndex,
+ Entry.first.ToHexString(),
+ m_BucketDir);
}
else
{
@@ -783,21 +839,50 @@ void
ZenCacheDiskLayer::CacheBucket::Flush()
{
ZEN_TRACE_CPU("Z$::Disk::Bucket::Flush");
+ bool Expected = false;
+ if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+ auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
- m_BlockStore.Flush();
-
- RwLock::SharedLockScope _(m_IndexLock);
+ m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_SlogFile.Flush();
- MakeIndexSnapshot();
- SaveManifest();
+
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ IndexMap Index;
+
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ MakeIndexSnapshot();
+ Index = m_Index;
+ Payloads = m_Payloads;
+ AccessTimes = m_AccessTimes;
+ }
+ SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads)));
}
void
-ZenCacheDiskLayer::CacheBucket::SaveManifest()
+ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest)
+{
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest");
+ try
+ {
+ SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Manifest);
+ }
+ catch (std::exception& Err)
+ {
+ ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what());
+ }
+}
+
+CbObject
+ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, std::vector<AccessTime>&& AccessTimes, std::vector<BucketPayload>&& Payloads)
{
using namespace std::literals;
- ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest");
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest");
CbObjectWriter Writer;
Writer << "BucketId"sv << m_BucketId;
@@ -805,46 +890,40 @@ ZenCacheDiskLayer::CacheBucket::SaveManifest()
if (!m_Index.empty())
{
- Writer.BeginArray("Timestamps"sv);
- for (auto& Kv : m_Index)
+ Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size()));
+ Writer.BeginArray("Keys"sv);
+ for (auto& Kv : Index)
{
- const IoHash& Key = Kv.first;
- GcClock::Tick AccessTime = m_AccessTimes[Kv.second];
+ const IoHash& Key = Kv.first;
+ Writer.AddHash(Key);
+ }
+ Writer.EndArray();
- Writer.BeginObject();
- Writer << "Key"sv << Key;
- Writer << "LastAccess"sv << AccessTime;
- Writer.EndObject();
+ Writer.BeginArray("Timestamps"sv);
+ for (auto& Kv : Index)
+ {
+ GcClock::Tick AccessTime = AccessTimes[Kv.second];
+ Writer.AddInteger(AccessTime);
}
Writer.EndArray();
- Writer.BeginArray("RawInfo"sv);
+ Writer.BeginArray("RawHash"sv);
+ for (auto& Kv : Index)
{
- for (auto& Kv : m_Index)
- {
- const IoHash& Key = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- if (Payload.RawHash != IoHash::Zero)
- {
- Writer.BeginObject();
- Writer << "Key"sv << Key;
- Writer << "RawHash"sv << Payload.RawHash;
- Writer << "RawSize"sv << Payload.RawSize;
- Writer.EndObject();
- }
- }
+ const BucketPayload& Payload = Payloads[Kv.second];
+ Writer.AddHash(Payload.RawHash);
}
Writer.EndArray();
- }
- try
- {
- SaveCompactBinaryObject(m_BucketDir / "zen_manifest", Writer.Save());
- }
- catch (std::exception& Err)
- {
- ZEN_WARN("writing manifest FAILED, reason: '{}'", Err.what());
+ Writer.BeginArray("RawSize"sv);
+ for (auto& Kv : Index)
+ {
+ const BucketPayload& Payload = Payloads[Kv.second];
+ Writer.AddInteger(Payload.RawSize);
+ }
+ Writer.EndArray();
}
+ return Writer.Save();
}
IoHash
@@ -1200,7 +1279,10 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
ExpiredKeys.reserve(1024);
std::vector<IoHash> Cids;
- Cids.reserve(1024);
+ if (!GcCtx.SkipCid())
+ {
+ Cids.reserve(1024);
+ }
for (const auto& Entry : Index)
{
@@ -1298,8 +1380,25 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
MovedCount,
TotalChunkCount,
NiceBytes(OldTotalSize));
- RwLock::SharedLockScope _(m_IndexLock);
- SaveManifest();
+
+ bool Expected = false;
+ if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
+ {
+ return;
+ }
+ auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
+
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
+ IndexMap Index;
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ MakeIndexSnapshot();
+ Index = m_Index;
+ Payloads = m_Payloads;
+ AccessTimes = m_AccessTimes;
+ }
+ SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads)));
});
m_SlogFile.Flush();
@@ -1360,48 +1459,63 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
IndexMap Index;
BlockStore::ReclaimSnapshotState BlockStoreState;
{
- ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State");
-
- RwLock::SharedLockScope __(m_IndexLock);
- Stopwatch Timer;
- const auto ____ = MakeGuard([&] {
- uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
- });
- if (m_Index.empty())
+ bool Expected = false;
+ if (m_IsFlushing || !m_IsFlushing.compare_exchange_strong(Expected, true))
{
- ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir);
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is currently flushing", m_BucketDir);
return;
}
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
-
- SaveManifest();
- Index = m_Index;
+ auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
- for (const IoHash& Key : DeleteCacheKeys)
+ std::vector<AccessTime> AccessTimes;
+ std::vector<BucketPayload> Payloads;
{
- if (auto It = Index.find(Key); It != Index.end())
+ ZEN_TRACE_CPU("Z$::Disk::Bucket::CollectGarbage::State");
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+
+ Stopwatch Timer;
+ const auto ____ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ if (m_Index.empty())
{
- const BucketPayload& Payload = m_Payloads[It->second];
- DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location};
- if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
+ ZEN_DEBUG("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir);
+ return;
+ }
+
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
+
+ Payloads = m_Payloads;
+ AccessTimes = m_AccessTimes;
+ Index = m_Index;
+
+ for (const IoHash& Key : DeleteCacheKeys)
+ {
+ if (auto It = Index.find(Key); It != Index.end())
{
- Entry.Location.Flags |= DiskLocation::kTombStone;
- ExpiredStandaloneEntries.push_back(Entry);
+ const BucketPayload& Payload = m_Payloads[It->second];
+ DiskIndexEntry Entry = {.Key = It->first, .Location = Payload.Location};
+ if (Entry.Location.Flags & DiskLocation::kStandaloneFile)
+ {
+ Entry.Location.Flags |= DiskLocation::kTombStone;
+ ExpiredStandaloneEntries.push_back(Entry);
+ }
}
}
- }
- if (GcCtx.IsDeletionMode())
- {
- for (const auto& Entry : ExpiredStandaloneEntries)
+ if (GcCtx.IsDeletionMode())
{
- m_Index.erase(Entry.Key);
- m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
- DeletedChunks.insert(Entry.Key);
+ for (const auto& Entry : ExpiredStandaloneEntries)
+ {
+ m_Index.erase(Entry.Key);
+ m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ DeletedChunks.insert(Entry.Key);
+ }
+ m_SlogFile.Append(ExpiredStandaloneEntries);
}
- m_SlogFile.Append(ExpiredStandaloneEntries);
}
+ SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), std::move(Payloads)));
}
if (GcCtx.IsDeletionMode())
diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h
index 80c643afa..c4bedfee8 100644
--- a/src/zenserver/cache/cachedisklayer.h
+++ b/src/zenserver/cache/cachedisklayer.h
@@ -188,6 +188,7 @@ private:
BlockStore m_BlockStore;
Oid m_BucketId;
uint64_t m_LargeObjectThreshold = 128 * 1024;
+ std::atomic_bool m_IsFlushing{};
// These files are used to manage storage of small objects for this bucket
@@ -221,16 +222,17 @@ private:
std::atomic_uint64_t m_TotalStandaloneSize{};
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
- void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
- void MakeIndexSnapshot();
- uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
- uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition);
- void OpenLog(const bool IsNew);
- void SaveManifest();
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const;
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const;
+ void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const;
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
+ uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition);
+ void OpenLog(const bool IsNew);
+ CbObject MakeManifest(IndexMap&& Index, std::vector<AccessTime>&& AccessTimes, std::vector<BucketPayload>&& Payloads);
+ void SaveManifest(CbObject&& Manifest);
CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, size_t Index) const;
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 0a2947b16..1b6eeca3a 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -1030,46 +1030,50 @@ TEST_CASE("z$.gc")
{
ScopedTemporaryDirectory TempDir;
GcManager Gc;
- ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
- const auto Bucket = "rightintwo"sv;
-
- std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
-
- for (const auto& Key : Keys)
{
- IoBuffer Value = testutils::CreateBinaryCacheValue(128);
- Zcs.Put(Bucket, Key, {.Value = Value});
- }
-
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2));
- GcCtx.CollectSmallObjects(true);
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
+ const auto Bucket = "rightintwo"sv;
- Gc.CollectGarbage(GcCtx);
+ std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)};
for (const auto& Key : Keys)
{
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(Exists);
+ IoBuffer Value = testutils::CreateBinaryCacheValue(128);
+ Zcs.Put(Bucket, Key, {.Value = Value});
}
- }
- // Move forward in time and collect again
- {
- GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2));
- GcCtx.CollectSmallObjects(true);
+ {
+ GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2));
+ GcCtx.CollectSmallObjects(true);
- Zcs.Flush();
- Gc.CollectGarbage(GcCtx);
+ Gc.CollectGarbage(GcCtx);
- for (const auto& Key : Keys)
- {
- ZenCacheValue CacheValue;
- const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
- CHECK(!Exists);
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(Exists);
+ }
}
+ // Move forward in time and collect again
+ {
+ GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2));
+ GcCtx.CollectSmallObjects(true);
+
+ Zcs.Flush();
+ Gc.CollectGarbage(GcCtx);
+
+ for (const auto& Key : Keys)
+ {
+ ZenCacheValue CacheValue;
+ const bool Exists = Zcs.Get(Bucket, Key, CacheValue);
+ CHECK(!Exists);
+ }
+ }
+ }
+ {
+ ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache");
CHECK_EQ(0, Zcs.StorageSize().DiskSize);
}
}
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 4ddbdded7..4402e4486 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1239,15 +1239,37 @@ ProjectStore::Project::ReadAccessTimes()
if (ValidationError == CbValidateError::None)
{
- CbObject Reader = LoadCompactBinaryObject(Obj);
- CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView();
+ CbObject Reader = LoadCompactBinaryObject(Obj);
- for (CbFieldView& Entry : LastAccessTimes)
+ uint64_t Count = Reader["count"sv].AsUInt64(0);
+ if (Count > 0)
{
- CbObjectView AccessTime = Entry.AsObjectView();
- std::string_view Id = AccessTime["id"sv].AsString();
- GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64();
- m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick);
+ std::vector<uint64_t> Ticks;
+ Ticks.reserve(Count);
+ CbArrayView TicksArray = Reader["ticks"sv].AsArrayView();
+ for (CbFieldView& TickView : TicksArray)
+ {
+ Ticks.emplace_back(TickView.AsUInt64());
+ }
+ CbArrayView IdArray = Reader["ids"sv].AsArrayView();
+ uint64_t Index = 0;
+ for (CbFieldView& IdView : IdArray)
+ {
+ std::string_view Id = IdView.AsString();
+ m_LastAccessTimes.insert_or_assign(std::string(Id), Ticks[Index++]);
+ }
+ }
+
+ ////// Legacy format read
+ {
+ CbArrayView LastAccessTimes = Reader["lastaccess"sv].AsArrayView();
+ for (CbFieldView& Entry : LastAccessTimes)
+ {
+ CbObjectView AccessTime = Entry.AsObjectView();
+ std::string_view Id = AccessTime["id"sv].AsString();
+ GcClock::Tick AccessTick = AccessTime["tick"sv].AsUInt64();
+ m_LastAccessTimes.insert_or_assign(std::string(Id), AccessTick);
+ }
}
}
else
@@ -1261,26 +1283,27 @@ ProjectStore::Project::WriteAccessTimes()
{
using namespace std::literals;
- RwLock::ExclusiveLockScope _(m_ProjectLock);
+ CbObjectWriter Writer;
- BinaryWriter Mem;
+ Writer.AddInteger("count", gsl::narrow<uint64_t>(m_LastAccessTimes.size()));
+ Writer.BeginArray("ids");
- CbObjectWriter Writer;
- Writer.BeginArray("lastaccess");
{
+ RwLock::SharedLockScope _(m_ProjectLock);
for (const auto& It : m_LastAccessTimes)
{
- Writer.BeginObject();
- {
- Writer << "id"sv << It.first;
- Writer << "tick"sv << gsl::narrow<uint64_t>(It.second);
- }
- Writer.EndObject();
+ Writer << It.first;
}
+ Writer.EndArray();
+ Writer.BeginArray("ticks");
+ for (const auto& It : m_LastAccessTimes)
+ {
+ Writer << gsl::narrow<uint64_t>(It.second);
+ }
+ Writer.EndArray();
}
- Writer.EndArray();
- Writer.Save(Mem);
+ CbObject Data = Writer.Save();
try
{
@@ -1290,10 +1313,7 @@ ProjectStore::Project::WriteAccessTimes()
ZEN_INFO("persisting access times for project '{}' to {}", Identifier, ProjectAccessTimesFilePath);
- BasicFile Blob;
- Blob.Open(ProjectAccessTimesFilePath, BasicFile::Mode::kTruncate);
- Blob.Write(Mem.Data(), Mem.Size(), 0);
- Blob.Flush();
+ WriteFile(ProjectAccessTimesFilePath, Data.GetBuffer().AsIoBuffer());
}
catch (std::exception& Err)
{
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index b5ed17fc6..f99b0bc4a 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -233,7 +233,6 @@ BlockStore::Prune(const std::vector<BlockStoreLocation>& KnownLocations)
if (!KnownBlocks.contains(BlockIndex))
{
Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex];
- m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
BlocksToDelete.push_back(BlockIndex);
}
}
@@ -242,6 +241,7 @@ BlockStore::Prune(const std::vector<BlockStoreLocation>& KnownLocations)
{
// Clear out unused blocks
Ref<BlockStoreFile> BlockFile = m_ChunkBlocks[BlockIndex];
+ m_TotalSize.fetch_sub(BlockFile->FileSize(), std::memory_order::relaxed);
m_ChunkBlocks.erase(BlockIndex);
ZEN_DEBUG("marking block store file '{}' for delete, block #{}", BlockFile->GetPath(), BlockIndex);
BlockFile->MarkAsDeleteOnClose();
@@ -354,22 +354,31 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
}
void
-BlockStore::Flush()
+BlockStore::Flush(bool ForceNewBlock)
{
ZEN_TRACE_CPU("BlockStore::Flush");
- RwLock::ExclusiveLockScope _(m_InsertLock);
- if (m_CurrentInsertOffset > 0)
+ if (ForceNewBlock)
{
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
- if (m_WriteBlock)
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
{
- m_WriteBlock->Flush();
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ if (m_WriteBlock)
+ {
+ m_WriteBlock->Flush();
+ }
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
}
- m_WriteBlock = nullptr;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
- m_CurrentInsertOffset = 0;
+ return;
+ }
+ RwLock::SharedLockScope _(m_InsertLock);
+ if (m_WriteBlock)
+ {
+ m_WriteBlock->Flush();
}
}
@@ -449,11 +458,6 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
{
const BlockStoreLocation& Location = ChunkLocations[Index];
OldTotalSize += Location.Size;
- if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex))
- {
- continue;
- }
-
auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
size_t ChunkMapIndex = 0;
if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
@@ -524,9 +528,12 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
uint32_t NewBlockIndex = 0;
for (uint32_t BlockIndex : BlocksToReWrite)
{
+ bool IsActiveWriteBlock = Snapshot.m_ActiveWriteBlocks.contains(BlockIndex);
+
const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
Ref<BlockStoreFile> OldBlockFile;
+ if (!IsActiveWriteBlock)
{
RwLock::SharedLockScope _i(m_InsertLock);
Stopwatch Timer;
@@ -553,6 +560,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
}
ChangeCallback({}, DeleteMap);
DeletedCount += DeleteMap.size();
+ if (OldBlockFile)
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
Stopwatch Timer;
@@ -561,18 +569,15 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
- if (OldBlockFile)
- {
- ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
- ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
- m_ChunkBlocks.erase(BlockIndex);
- m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
- OldBlockFile->MarkAsDeleteOnClose();
- }
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
+ m_ChunkBlocks.erase(BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
+ OldBlockFile->MarkAsDeleteOnClose();
}
continue;
}
- else if (!OldBlockFile)
+ else if (!OldBlockFile && !IsActiveWriteBlock)
{
// If the block file pointed to does not exist, move any keep chunk them to deleted list
ZEN_ERROR("Expected to find block {} in {} - this should never happen, marking {} entries as deleted.",
@@ -585,6 +590,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
}
MovedChunksArray MovedChunks;
+ if (OldBlockFile)
{
ZEN_TRACE_CPU("BlockStore::ReclaimSpace::MoveBlock");
std::vector<uint8_t> Chunk;
@@ -689,9 +695,11 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
}
ChangeCallback(MovedChunks, DeleteMap);
- MovedCount += KeepMap.size();
+ MovedCount += MovedChunks.size();
DeletedCount += DeleteMap.size();
MovedChunks.clear();
+
+ if (OldBlockFile)
{
RwLock::ExclusiveLockScope __(m_InsertLock);
Stopwatch Timer;
@@ -700,14 +708,12 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
ReadBlockTimeUs += ElapsedUs;
ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
- if (OldBlockFile)
- {
- ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
- ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
- m_ChunkBlocks.erase(BlockIndex);
- m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
- OldBlockFile->MarkAsDeleteOnClose();
- }
+
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ ZEN_ASSERT(m_ChunkBlocks[BlockIndex] == OldBlockFile);
+ m_ChunkBlocks.erase(BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
+ OldBlockFile->MarkAsDeleteOnClose();
}
}
if (NewBlockFile)
@@ -1117,7 +1123,7 @@ TEST_CASE("blockstore.clean.stray.blocks")
CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
}
-TEST_CASE("blockstore.flush.forces.new.block")
+TEST_CASE("blockstore.flush.force.new.block")
{
using namespace blockstore::impl;
@@ -1129,10 +1135,10 @@ TEST_CASE("blockstore.flush.forces.new.block")
std::string FirstChunkData = "This is the data of the first chunk that we will write";
WriteStringAsChunk(Store, FirstChunkData, 4);
- Store.Flush();
+ Store.Flush(/*ForceNewBlock*/ true);
std::string SecondChunkData = "This is the data for the second chunk that we will write";
WriteStringAsChunk(Store, SecondChunkData, 4);
- Store.Flush();
+ Store.Flush(/*ForceNewBlock*/ true);
std::string ThirdChunkData =
"This is a much longer string that will not fit in the first block so it should be placed in the second block";
WriteStringAsChunk(Store, ThirdChunkData, 4);
@@ -1157,7 +1163,7 @@ TEST_CASE("blockstore.iterate.chunks")
std::string SecondChunkData = "This is the data for the second chunk that we will write";
BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
- Store.Flush();
+ Store.Flush(/*ForceNewBlock*/ false);
std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L');
BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
@@ -1267,7 +1273,7 @@ TEST_CASE("blockstore.reclaim.space")
ChunksToKeep.push_back(ChunkIndex);
}
- Store.Flush();
+ Store.Flush(/*ForceNewBlock*/ false);
BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState();
Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true);
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 1d1797597..ce2e53527 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -230,7 +230,7 @@ CasContainerStrategy::FilterChunks(HashKeySet& InOutChunks)
void
CasContainerStrategy::Flush()
{
- m_BlockStore.Flush();
+ m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_CasLog.Flush();
MakeIndexSnapshot();
}
@@ -801,7 +801,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
auto BlockIt = BlockSizes.find(DiskLocation.GetBlockIndex());
if (BlockIt == BlockSizes.end())
{
- ZEN_WARN("Unknown block {} for entry {}", DiskLocation.GetBlockIndex(), Entry.first.ToHexString());
+ ZEN_WARN("Unknown block {} for entry {} in '{}'", DiskLocation.GetBlockIndex(), Entry.first.ToHexString(), BasePath);
}
else
{
@@ -810,7 +810,10 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
uint64_t BlockSize = BlockIt->second;
if (BlockLocation.Offset + BlockLocation.Size > BlockSize)
{
- ZEN_WARN("Range is outside of block {} for entry {}", BlockLocation.BlockIndex, Entry.first.ToHexString());
+ ZEN_WARN("Range is outside of block {} for entry {} in '{}'",
+ BlockLocation.BlockIndex,
+ Entry.first.ToHexString(),
+ BasePath);
}
else
{
@@ -1068,7 +1071,6 @@ TEST_CASE("compactcas.gc.removefile")
TEST_CASE("compactcas.gc.compact")
{
- // for (uint32_t i = 0; i < 100; ++i)
{
ScopedTemporaryDirectory TempDir;
@@ -1111,6 +1113,17 @@ TEST_CASE("compactcas.gc.compact")
CHECK(Cas.HaveChunk(ChunkHashes[7]));
CHECK(Cas.HaveChunk(ChunkHashes[8]));
+ auto ValidateChunkExists = [&](size_t Index) {
+ IoBuffer Chunk = Cas.FindChunk(ChunkHashes[Index]);
+ bool Exists = !!Chunk;
+ CHECK(Exists);
+ IoHash Hash = IoHash::HashBuffer(Chunk);
+ if (ChunkHashes[Index] != Hash)
+ {
+ CHECK(fmt::format("{}", ChunkHashes[Index]) == fmt::format("{}", Hash));
+ }
+ };
+
// Keep first and last
{
GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
@@ -1134,8 +1147,8 @@ TEST_CASE("compactcas.gc.compact")
CHECK(!Cas.HaveChunk(ChunkHashes[7]));
CHECK(Cas.HaveChunk(ChunkHashes[8]));
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ ValidateChunkExists(0);
+ ValidateChunkExists(8);
Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
@@ -1167,7 +1180,7 @@ TEST_CASE("compactcas.gc.compact")
CHECK(!Cas.HaveChunk(ChunkHashes[7]));
CHECK(Cas.HaveChunk(ChunkHashes[8]));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ ValidateChunkExists(8);
Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
@@ -1201,9 +1214,9 @@ TEST_CASE("compactcas.gc.compact")
CHECK(Cas.HaveChunk(ChunkHashes[7]));
CHECK(!Cas.HaveChunk(ChunkHashes[8]));
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
+ ValidateChunkExists(1);
+ ValidateChunkExists(4);
+ ValidateChunkExists(7);
Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
@@ -1236,9 +1249,9 @@ TEST_CASE("compactcas.gc.compact")
CHECK(Cas.HaveChunk(ChunkHashes[7]));
CHECK(Cas.HaveChunk(ChunkHashes[8]));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ ValidateChunkExists(6);
+ ValidateChunkExists(7);
+ ValidateChunkExists(8);
Cas.InsertChunk(Chunks[0], ChunkHashes[0]);
Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
@@ -1273,11 +1286,11 @@ TEST_CASE("compactcas.gc.compact")
CHECK(!Cas.HaveChunk(ChunkHashes[7]));
CHECK(Cas.HaveChunk(ChunkHashes[8]));
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ ValidateChunkExists(0);
+ ValidateChunkExists(2);
+ ValidateChunkExists(4);
+ ValidateChunkExists(6);
+ ValidateChunkExists(8);
Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
@@ -1286,15 +1299,15 @@ TEST_CASE("compactcas.gc.compact")
}
// Verify that we nicely appended blocks even after all GC operations
- CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
- CHECK(ChunkHashes[1] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[1])));
- CHECK(ChunkHashes[2] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[2])));
- CHECK(ChunkHashes[3] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[3])));
- CHECK(ChunkHashes[4] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[4])));
- CHECK(ChunkHashes[5] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[5])));
- CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
- CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
- CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
+ ValidateChunkExists(0);
+ ValidateChunkExists(1);
+ ValidateChunkExists(2);
+ ValidateChunkExists(3);
+ ValidateChunkExists(4);
+ ValidateChunkExists(5);
+ ValidateChunkExists(6);
+ ValidateChunkExists(7);
+ ValidateChunkExists(8);
}
}
@@ -1497,6 +1510,7 @@ TEST_CASE("compactcas.threadedinsert")
IoBuffer Chunk = CreateRandomChunk(kChunkSize);
IoHash Hash = HashBuffer(Chunk);
NewChunks[Hash] = Chunk;
+ GcChunkHashes.insert(Hash);
}
std::atomic_uint32_t AddedChunkCount;
@@ -1522,42 +1536,40 @@ TEST_CASE("compactcas.threadedinsert")
});
}
- while (AddedChunkCount.load() < NewChunks.size())
+ std::unordered_set<IoHash, IoHash::Hasher> ChunksToDelete;
+ std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
+ size_t C = 0;
+ while (C < KeepHashes.size())
{
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
+ if (C % 155 == 0)
{
- if (Cas.HaveChunk(Chunk.first))
+ if (C < KeepHashes.size() - 1)
{
- GcChunkHashes.emplace(Chunk.first);
+ ChunksToDelete.insert(KeepHashes[C]);
+ KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
}
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
+ if (C + 3 < KeepHashes.size() - 1)
{
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
+ ChunksToDelete.insert(KeepHashes[C + 3]);
+ KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
+ KeepHashes.pop_back();
}
- C++;
}
+ C++;
+ }
+ while (AddedChunkCount.load() < NewChunks.size())
+ {
GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
GcCtx.CollectSmallObjects(true);
GcCtx.AddRetainedCids(KeepHashes);
Cas.CollectGarbage(GcCtx);
const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) {
+ CHECK(ChunksToDelete.contains(ChunkHash));
+ GcChunkHashes.erase(ChunkHash);
+ });
}
while (WorkCompleted < NewChunks.size() + Chunks.size())
@@ -1565,40 +1577,15 @@ TEST_CASE("compactcas.threadedinsert")
Sleep(1);
}
- // Need to be careful since we might GC blocks we don't know outside of RwLock::ExclusiveLockScope
- for (const auto& Chunk : NewChunks)
- {
- if (Cas.HaveChunk(Chunk.first))
- {
- GcChunkHashes.emplace(Chunk.first);
- }
- }
- std::vector<IoHash> KeepHashes(GcChunkHashes.begin(), GcChunkHashes.end());
- size_t C = 0;
- while (C < KeepHashes.size())
- {
- if (C % 155 == 0)
- {
- if (C < KeepHashes.size() - 1)
- {
- KeepHashes[C] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- if (C + 3 < KeepHashes.size() - 1)
- {
- KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1];
- KeepHashes.pop_back();
- }
- }
- C++;
- }
-
GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24));
GcCtx.CollectSmallObjects(true);
GcCtx.AddRetainedCids(KeepHashes);
Cas.CollectGarbage(GcCtx);
const HashKeySet& Deleted = GcCtx.DeletedCids();
- Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ Deleted.IterateHashes([&GcChunkHashes, &ChunksToDelete](const IoHash& ChunkHash) {
+ CHECK(ChunksToDelete.contains(ChunkHash));
+ GcChunkHashes.erase(ChunkHash);
+ });
}
{
WorkCompleted = 0;
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 0d742d7e1..fe568a487 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -795,6 +795,21 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&&
}
void
+FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, uint64_t Size)>&& Callback)
+{
+ ZEN_TRACE_CPU("FileCas::IterateChunks");
+
+ ZEN_ASSERT(m_IsInitialized);
+
+ RwLock::SharedLockScope _(m_Lock);
+ for (const auto& It : m_Index)
+ {
+ const IoHash& NameHash = It.first;
+ Callback(NameHash, It.second.Size);
+ }
+}
+
+void
FileCasStrategy::Flush()
{
ZEN_TRACE_CPU("FileCas::Flush");
@@ -927,7 +942,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
{
ZEN_TRACE_CPU("FileCas::CollectGarbage::Filter");
- IterateChunks([&](const IoHash& Hash, IoBuffer&& Payload) {
+ IterateChunks([&](const IoHash& Hash, uint64_t Size) {
bool KeepThis = false;
CandidateCas[0] = Hash;
GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
@@ -935,16 +950,14 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
KeepThis = true;
});
- const uint64_t FileSize = Payload.GetSize();
-
if (!KeepThis)
{
ChunksToDelete.push_back(Hash);
- ChunksToDeleteBytes.fetch_add(FileSize);
+ ChunksToDeleteBytes.fetch_add(Size);
}
++ChunkCount;
- ChunkBytes.fetch_add(FileSize);
+ ChunkBytes.fetch_add(Size);
});
}
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index d9186f05b..10c181c0b 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -86,6 +86,7 @@ private:
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback);
+ void IterateChunks(std::function<void(const IoHash& Hash, uint64_t PayloadSize)>&& Callback);
void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
struct ShardingHelper
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index edd6df5a2..6fc0652f2 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -139,7 +139,7 @@ public:
void WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback);
IoBuffer TryGetChunk(const BlockStoreLocation& Location) const;
- void Flush();
+ void Flush(bool ForceNewBlock);
ReclaimSnapshotState GetReclaimSnapshotState();
void ReclaimSpace(