aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-11-24 13:20:59 +0100
committerGitHub <[email protected]>2022-11-24 04:20:59 -0800
commit666a543ed82896c972526ef08476a41ccbfbd2c4 (patch)
tree49a52941d9ced665431ebf320d0f7d0f4b6e5cfa
parentDon't resize block store block file to max size at creation (#193) (diff)
downloadzen-666a543ed82896c972526ef08476a41ccbfbd2c4.tar.xz
zen-666a543ed82896c972526ef08476a41ccbfbd2c4.zip
Fix disk usage stats (#194)
* Improve tracking of used disk space for filecas and compactcas Add tracking of used disk space for project store Remove ZenCacheStore as GcStorage/GcContributor - underlying ZenCacheNamespace instances register themselves directly - removing this also fixes double reporting of GcStorageSize for namespaces * changelog
-rw-r--r--CHANGELOG.md4
-rw-r--r--zenserver/cache/structuredcachestore.cpp94
-rw-r--r--zenserver/cache/structuredcachestore.h10
-rw-r--r--zenserver/projectstore.cpp44
-rw-r--r--zenserver/projectstore.h4
-rw-r--r--zenstore/blockstore.cpp25
-rw-r--r--zenstore/compactcas.cpp29
-rw-r--r--zenstore/compactcas.h4
-rw-r--r--zenstore/filecas.cpp287
-rw-r--r--zenstore/filecas.h18
-rw-r--r--zenstore/gc.cpp4
-rw-r--r--zenstore/include/zenstore/blockstore.h5
12 files changed, 357 insertions, 171 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 82c062130..9aae05f5e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,10 @@
- `<host>/health/version`
- Feature: Configure OpenID providers from cmd line and Lua cfg
- Feature: Added zen command line executable to release distribution
+- Bugfix: Fix double reporting of disk usage for namespaces
+- Bugfix: Fix double garbage collection analisys for of namespaces
+- Improvement: Improve tracking of used disk space for filecas and compactcas
+- Improvement: Add tracking of used disk space for project store
- Improvement: Bumped limit for storing cache values as separate files to reduce number of loose files
- Improvement: Optimizations when handling compressed buffer (less materialization and reading of headers)
- Improvement: Send attachments as file references if the IoBuffer we find represents a complete file and `AcceptFlags` in RPC request allows it.
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 1fab3b312..9773c2ed2 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -552,8 +552,17 @@ void
ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
{
+ BucketValue IndexValue(Value.Value, GcClock::TickCount());
RwLock::ExclusiveLockScope _(m_BucketLock);
- m_CacheMap.insert_or_assign(HashKey, BucketValue(Value.Value, GcClock::TickCount()));
+ if (auto It = m_CacheMap.find(HashKey); It != m_CacheMap.end())
+ {
+ m_TotalSize.fetch_sub(It->second.Payload.GetSize(), std::memory_order::relaxed);
+ It.value() = std::move(IndexValue);
+ }
+ else
+ {
+ m_CacheMap.insert_or_assign(HashKey, std::move(IndexValue));
+ }
}
m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed);
@@ -564,6 +573,7 @@ ZenCacheMemoryLayer::CacheBucket::Drop()
{
RwLock::ExclusiveLockScope _(m_BucketLock);
m_CacheMap.clear();
+ m_TotalSize.store(0);
}
//////////////////////////////////////////////////////////////////////////
@@ -820,7 +830,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
{
m_BucketDir = BucketDir;
- m_TotalSize = 0;
+ m_TotalStandaloneSize = 0;
m_Index.clear();
@@ -846,9 +856,9 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
for (const auto& Entry : m_Index)
{
const DiskLocation& Location = Entry.second.Location;
- m_TotalSize.fetch_add(Location.Size(), std::memory_order::relaxed);
if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
+ m_TotalStandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed);
continue;
}
const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_PayloadAlignment);
@@ -1153,6 +1163,23 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
m_Index.erase(BadKey);
}
}
+ for (const DiskIndexEntry& Entry : LogEntries)
+ {
+ if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ ExtendablePathBuilder<256> Path;
+ BuildPath(Path, Entry.Key);
+ fs::path FilePath = Path.ToPath();
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Entry.Key));
+ if (fs::is_regular_file(FilePath))
+ {
+ ZEN_DEBUG("deleting bad standalone cache file '{}'", Path.ToUtf8());
+ std::error_code Ec;
+ fs::remove(FilePath, Ec); // We don't care if we fail, we are no longer tracking this file...
+ }
+ m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
+ }
+ }
m_SlogFile.Append(LogEntries);
}
}
@@ -1277,7 +1304,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
uint64_t ReadBlockLongestTimeUs = 0;
uint64_t TotalChunkCount = 0;
uint64_t DeletedSize = 0;
- uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
+ uint64_t OldTotalSize = TotalSize();
uint64_t DeletedCount = 0;
uint64_t MovedCount = 0;
@@ -1359,6 +1386,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
for (const auto& Entry : ExpiredStandaloneEntries)
{
m_Index.erase(Entry.Key);
+ m_TotalStandaloneSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
}
m_SlogFile.Append(ExpiredStandaloneEntries);
}
@@ -1422,10 +1450,9 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
}
m_SlogFile.Append(DiskIndexEntry{.Key = Key, .Location = RestoreLocation});
m_Index.insert({Key, {Loc, GcClock::TickCount()}});
- m_TotalSize.fetch_add(Entry.Location.Size(), std::memory_order::relaxed);
+ m_TotalStandaloneSize.fetch_add(RestoreLocation.Size(), std::memory_order::relaxed);
continue;
}
- m_TotalSize.fetch_sub(Entry.Location.Size(), std::memory_order::relaxed);
DeletedSize += Entry.Location.Size();
DeletedCount++;
}
@@ -1478,13 +1505,13 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
if (!PerformDelete)
{
m_BlockStore.ReclaimSpace(BlockStoreState, ChunkLocations, KeepChunkIndexes, m_PayloadAlignment, true);
- uint64_t TotalSize = m_TotalSize.load(std::memory_order_relaxed);
+ uint64_t CurrentTotalSize = TotalSize();
ZEN_INFO("garbage collect from '{}' DISABLED, found #{} {} chunks of total #{} {}",
m_BucketDir / m_BucketName,
DeleteCount,
0, // NiceBytes(TotalSize - NewTotalSize),
- TotalChunkCount,
- NiceBytes(TotalSize));
+ CurrentTotalSize,
+ NiceBytes(CurrentTotalSize));
return;
}
@@ -1533,8 +1560,6 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
if (Entry.Location.GetFlags() & DiskLocation::kTombStone)
{
m_Index.erase(Entry.Key);
- uint64_t ChunkSize = Entry.Location.GetBlockLocation(m_PayloadAlignment).Size;
- m_TotalSize.fetch_sub(ChunkSize);
continue;
}
m_Index[Entry.Key].Location = Entry.Location;
@@ -1706,29 +1731,21 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
DiskLocation Loc(NewFileSize, EntryFlags);
IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
- uint64_t OldFileSize = 0;
RwLock::ExclusiveLockScope _(m_IndexLock);
if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
// Previously unknown object
- m_Index.insert({HashKey, Entry});
+ m_Index.insert_or_assign(HashKey, std::move(Entry));
}
else
{
// TODO: should check if write is idempotent and bail out if it is?
- OldFileSize = It.value().Location.Size();
- It.value() = Entry;
+ m_TotalStandaloneSize.fetch_sub(It.value().Location.Size(), std::memory_order::relaxed);
+ It.value() = std::move(Entry);
}
m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- if (OldFileSize <= NewFileSize)
- {
- m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed);
- }
- else
- {
- m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed);
- }
+ m_TotalStandaloneSize.fetch_add(NewFileSize, std::memory_order::relaxed);
}
void
@@ -1764,7 +1781,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
}
});
- m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed);
}
//////////////////////////////////////////////////////////////////////////
@@ -2010,9 +2026,9 @@ ZenCacheDiskLayer::TotalSize() const
static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
-: GcStorage(Gc)
-, GcContributor(Gc)
-, m_Gc(Gc)
+//: GcStorage(Gc)
+//, GcContributor(Gc)
+: m_Gc(Gc)
, m_Configuration(Configuration)
{
CreateDirectories(m_Configuration.BasePath);
@@ -2170,18 +2186,6 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names
}
}
-void
-ZenCacheStore::GatherReferences(GcContext& GcCtx)
-{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.GatherReferences(GcCtx); });
-}
-
-void
-ZenCacheStore::CollectGarbage(GcContext& GcCtx)
-{
- IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.CollectGarbage(GcCtx); });
-}
-
GcStorageSize
ZenCacheStore::StorageSize() const
{
@@ -2297,8 +2301,8 @@ TEST_CASE("z$.size")
}
CacheSize = Zcs.StorageSize();
- CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize);
- CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.MemorySize);
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize);
}
{
@@ -2307,7 +2311,7 @@ TEST_CASE("z$.size")
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
- CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
for (size_t Bucket = 0; Bucket < 4; ++Bucket)
{
@@ -2340,7 +2344,7 @@ TEST_CASE("z$.size")
}
CacheSize = Zcs.StorageSize();
- CHECK_EQ(CacheValue.GetSize() * Count, CacheSize.DiskSize);
+ CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize);
CHECK_EQ(0, CacheSize.MemorySize);
}
@@ -2350,7 +2354,7 @@ TEST_CASE("z$.size")
const GcStorageSize SerializedSize = Zcs.StorageSize();
CHECK_EQ(SerializedSize.MemorySize, 0);
- CHECK_EQ(SerializedSize.DiskSize, CacheSize.DiskSize);
+ CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize);
for (size_t Bucket = 0; Bucket < 4; ++Bucket)
{
@@ -2606,7 +2610,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
}
const uint64_t TotalSize = Zcs.StorageSize().DiskSize;
- CHECK_EQ(kChunkSize * Chunks.size(), TotalSize);
+ CHECK_LE(kChunkSize * Chunks.size(), TotalSize);
{
std::atomic<size_t> WorkCompleted = 0;
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index ae1fdf3d3..d5cb536e5 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -245,7 +245,7 @@ private:
void CollectGarbage(GcContext& GcCtx);
void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
- inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
+ inline uint64_t TotalSize() const { return m_TotalStandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(); }
private:
const uint64_t MaxBlockSize = 1ull << 30;
@@ -288,7 +288,7 @@ private:
RwLock m_IndexLock;
IndexMap m_Index;
- std::atomic_uint64_t m_TotalSize{};
+ std::atomic_uint64_t m_TotalStandaloneSize{};
void BuildPath(PathBuilderBase& Path, const IoHash& HashKey);
void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
@@ -353,7 +353,7 @@ private:
ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
};
-class ZenCacheStore final : public GcStorage, public GcContributor
+class ZenCacheStore final
{
public:
static constexpr std::string_view DefaultNamespace =
@@ -376,9 +376,7 @@ public:
void Flush();
void Scrub(ScrubContext& Ctx);
- virtual void GatherReferences(GcContext& GcCtx) override;
- virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override;
+ GcStorageSize StorageSize() const;
private:
ZenCacheNamespace* GetNamespace(std::string_view Namespace);
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 2268b5caf..87118991e 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -111,6 +111,12 @@ struct ProjectStore::OplogStorage : public RefCounted
static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); }
+ uint64_t OpBlobsSize() const
+ {
+ RwLock::SharedLockScope _(m_RwLock);
+ return m_NextOpsOffset;
+ }
+
void Open(bool IsCreate)
{
using namespace std::literals;
@@ -256,7 +262,7 @@ struct ProjectStore::OplogStorage : public RefCounted
private:
ProjectStore::Oplog* m_OwnerOplog;
std::filesystem::path m_OplogStoragePath;
- RwLock m_RwLock;
+ mutable RwLock m_RwLock;
TCasLogFile<OplogEntry> m_Oplog;
BasicFile m_OpBlobs;
std::atomic<uint64_t> m_NextOpsOffset{0};
@@ -329,6 +335,17 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
GcCtx.AddRetainedCids(Hashes);
}
+uint64_t
+ProjectStore::Oplog::TotalSize() const
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+ if (m_Storage)
+ {
+ return m_Storage->OpBlobsSize();
+ }
+ return 0;
+}
+
std::filesystem::path
ProjectStore::Oplog::PrepareForDelete(bool MoveFolder)
{
@@ -912,6 +929,20 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx)
IterateOplogs([&](Oplog& Ops) { Ops.GatherReferences(GcCtx); });
}
+uint64_t
+ProjectStore::Project::TotalSize() const
+{
+ uint64_t Result = 0;
+ {
+ RwLock::SharedLockScope _(m_ProjectLock);
+ for (const auto& It : m_Oplogs)
+ {
+ Result += It.second->TotalSize();
+ }
+ }
+ return Result;
+}
+
bool
ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath)
{
@@ -1160,7 +1191,16 @@ ProjectStore::CollectGarbage(GcContext& GcCtx)
GcStorageSize
ProjectStore::StorageSize() const
{
- return {0, 0};
+ GcStorageSize Result;
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+ for (auto& Kv : m_Projects)
+ {
+ const Ref<Project>& Project = Kv.second;
+ Result.DiskSize += Project->TotalSize();
+ }
+ }
+ return Result;
}
Ref<ProjectStore::Project>
diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h
index c62e7840d..9d909de8c 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore.h
@@ -118,6 +118,7 @@ public:
void Flush();
void Scrub(ScrubContext& Ctx) const;
void GatherReferences(GcContext& GcCtx);
+ uint64_t TotalSize() const;
std::size_t OplogCount() const
{
@@ -188,6 +189,7 @@ public:
void Scrub(ScrubContext& Ctx);
spdlog::logger& Log();
void GatherReferences(GcContext& GcCtx);
+ uint64_t TotalSize() const;
bool PrepareForDelete(std::filesystem::path& OutDeletePath);
private:
@@ -246,7 +248,7 @@ private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
std::filesystem::path m_ProjectBasePath;
- RwLock m_ProjectsLock;
+ mutable RwLock m_ProjectsLock;
std::map<std::string, Ref<Project>> m_Projects;
std::filesystem::path BasePathForProject(std::string_view ProjectId);
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index 682cc4472..c8bf482fa 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -95,12 +95,6 @@ BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
}
void
-BlockStoreFile::Truncate(uint64_t Size)
-{
- m_File.SetFileSize(Size);
-}
-
-void
BlockStoreFile::Flush()
{
m_File.Flush();
@@ -130,6 +124,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
ZEN_ASSERT(MaxBlockCount > 0);
ZEN_ASSERT(IsPow2(MaxBlockCount));
+ m_TotalSize = 0;
m_BlocksBasePath = BlocksBasePath;
m_MaxBlockSize = MaxBlockSize;
@@ -184,6 +179,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
}
Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
BlockFile->Open();
+ m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
m_ChunkBlocks[BlockIndex] = BlockFile;
}
}
@@ -248,13 +244,15 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, cons
m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
m_CurrentInsertOffset = 0;
}
- uint64_t InsertOffset = m_CurrentInsertOffset;
- m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
- Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ uint64_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
+ uint64_t AlignedWriteSize = m_CurrentInsertOffset - InsertOffset;
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
m_ActiveWriteBlocks.push_back(WriteBlockIndex);
InsertLock.ReleaseNow();
WriteBlock->Write(Data, Size, InsertOffset);
+ m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed);
Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
@@ -477,6 +475,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
});
m_ChunkBlocks[BlockIndex] = nullptr;
ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
OldBlockFile->MarkAsDeleteOnClose();
}
continue;
@@ -496,7 +495,6 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
if (NewBlockFile)
{
- NewBlockFile->Truncate(WriteOffset);
NewBlockFile->Flush();
NewBlockFile = nullptr;
}
@@ -566,12 +564,13 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
- WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ uint64_t OldOffset = WriteOffset;
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed);
}
Chunk.clear();
if (NewBlockFile)
{
- NewBlockFile->Truncate(WriteOffset);
NewBlockFile->Flush();
NewBlockFile = nullptr;
}
@@ -596,6 +595,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
});
m_ChunkBlocks[BlockIndex] = nullptr;
ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed);
OldBlockFile->MarkAsDeleteOnClose();
}
}
@@ -606,6 +606,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
if (NewBlockFile)
{
ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
+ m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed);
NewBlockFile->MarkAsDeleteOnClose();
}
}
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 519478356..d0c2f59ac 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -176,7 +176,6 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
m_LocationMap.emplace(ChunkHash, DiskLocation);
}
});
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
return CasStore::InsertResult{.New = true};
}
@@ -484,8 +483,6 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
if (Entry.Flags & CasDiskIndexEntry::kTombstone)
{
m_LocationMap.erase(Entry.Key);
- uint64_t ChunkSize = Entry.Location.GetSize();
- m_TotalSize.fetch_sub(ChunkSize);
continue;
}
m_LocationMap[Entry.Key] = Entry.Location;
@@ -686,7 +683,6 @@ void
CasContainerStrategy::OpenContainer(bool IsNewStore)
{
// Add .running file and delete on clean on close to detect bad termination
- m_TotalSize = 0;
m_LocationMap.clear();
@@ -710,7 +706,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
for (const auto& Entry : m_LocationMap)
{
const BlockStoreDiskLocation& Location = Entry.second;
- m_TotalSize.fetch_add(Location.GetSize(), std::memory_order::relaxed);
KnownLocations.push_back(Location.Get(m_PayloadAlignment));
}
@@ -991,8 +986,6 @@ TEST_CASE("compactcas.gc.compact")
CHECK(Cas.HaveChunk(ChunkHashes[7]));
CHECK(Cas.HaveChunk(ChunkHashes[8]));
- uint64_t InitialSize = Cas.StorageSize().DiskSize;
-
// Keep first and last
{
GcContext GcCtx;
@@ -1018,15 +1011,15 @@ TEST_CASE("compactcas.gc.compact")
CHECK(ChunkHashes[0] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[0])));
CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
- }
- Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
- Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
- Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
- Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
- Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
- Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
- Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ Cas.InsertChunk(Chunks[1], ChunkHashes[1]);
+ Cas.InsertChunk(Chunks[2], ChunkHashes[2]);
+ Cas.InsertChunk(Chunks[3], ChunkHashes[3]);
+ Cas.InsertChunk(Chunks[4], ChunkHashes[4]);
+ Cas.InsertChunk(Chunks[5], ChunkHashes[5]);
+ Cas.InsertChunk(Chunks[6], ChunkHashes[6]);
+ Cas.InsertChunk(Chunks[7], ChunkHashes[7]);
+ }
// Keep last
{
@@ -1177,9 +1170,6 @@ TEST_CASE("compactcas.gc.compact")
CHECK(ChunkHashes[6] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[6])));
CHECK(ChunkHashes[7] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[7])));
CHECK(ChunkHashes[8] == IoHash::HashBuffer(Cas.FindChunk(ChunkHashes[8])));
-
- uint64_t FinalSize = Cas.StorageSize().DiskSize;
- CHECK(InitialSize == FinalSize);
}
}
@@ -1346,7 +1336,8 @@ TEST_CASE("compactcas.threadedinsert")
WorkCompleted = 0;
const uint64_t TotalSize = Cas.StorageSize().DiskSize;
- CHECK_EQ(ExpectedSize, TotalSize);
+ CHECK_LE(ExpectedSize, TotalSize);
+ CHECK_GE(ExpectedSize + 32768, TotalSize);
{
for (const auto& Chunk : Chunks)
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 2acac7ca3..3d9c42c1b 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -63,7 +63,7 @@ struct CasContainerStrategy final : public GcStorage
void Flush();
void Scrub(ScrubContext& Ctx);
virtual void CollectGarbage(GcContext& GcCtx) override;
- virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::acquire)}; }
+ virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_BlockStore.TotalSize()}; }
private:
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
@@ -87,8 +87,6 @@ private:
RwLock m_LocationMapLock;
typedef std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap_t;
LocationMap_t m_LocationMap;
-
- std::atomic_uint64_t m_TotalSize{};
};
void compactcas_forcelink();
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 9825f225a..1b53c405b 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -97,32 +97,48 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
ZEN_INFO("read log {} containing {}", m_RootDirectory / "cas.ulog", NiceBytes(m_TotalSize.load(std::memory_order::relaxed)));
});
- std::unordered_set<IoHash> FoundEntries;
- FoundEntries.reserve(10000);
+ m_KnownEntries.reserve(10000);
m_CasLog.Replay(
[&](const FileCasIndexEntry& Entry) {
if (Entry.IsFlagSet(FileCasIndexEntry::kTombStone))
{
- if (!FoundEntries.contains(Entry.Key))
+ if (m_KnownEntries.erase(Entry.Key) == 1u)
{
- return;
+ m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed);
}
- m_TotalSize.fetch_sub(Entry.Size, std::memory_order_relaxed);
- FoundEntries.erase(Entry.Key);
}
else
{
- if (FoundEntries.contains(Entry.Key))
+ if (m_KnownEntries.insert(Entry.Key).second)
{
- return;
+ m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed);
}
- FoundEntries.insert(Entry.Key);
- m_TotalSize.fetch_add(Entry.Size, std::memory_order_relaxed);
}
},
0);
}
+#if ZEN_PLATFORM_WINDOWS
+static void
+DeletePayloadFileOnClose(const void* FileHandle)
+{
+ const HANDLE WinFileHandle = (const HANDLE)FileHandle;
+ // This will cause the file to be deleted when the last handle to it is closed
+ FILE_DISPOSITION_INFO Fdi{};
+ Fdi.DeleteFile = TRUE;
+ BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+
+ if (!Success)
+ {
+ // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
+ // to delete it.
+ ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'",
+ PathFromHandle(WinFileHandle),
+ GetLastErrorAsString());
+ }
+}
+#endif
+
CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
{
@@ -134,10 +150,12 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
if (Mode == CasStore::InsertMode::kCopyOnly)
{
- ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
- if (std::filesystem::is_regular_file(Name.ShardedPath.ToPath()))
{
- return {.New = false};
+ RwLock::SharedLockScope _(m_Lock);
+ if (m_KnownEntries.contains(ChunkHash))
+ {
+ return CasStore::InsertResult{.New = false};
+ }
}
return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
}
@@ -148,33 +166,40 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
IoBufferFileReference FileRef;
if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
{
+ {
+ bool Exists = true;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Exists = m_KnownEntries.contains(ChunkHash);
+ }
+ if (Exists)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ DeletePayloadFileOnClose(FileRef.FileHandle);
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle);
+ if (unlink(FilePath.c_str()) < 0)
+ {
+ int UnlinkError = zen::GetLastError();
+ if (UnlinkError != ENOENT)
+ {
+ ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
+ FilePath.string(),
+ GetSystemErrorAsString(UnlinkError));
+ }
+ }
+#endif
+ return CasStore::InsertResult{.New = false};
+ }
+ }
+
ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
- RwLock::ExclusiveLockScope _(LockForHash(ChunkHash));
+ RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
#if ZEN_PLATFORM_WINDOWS
const HANDLE ChunkFileHandle = FileRef.FileHandle;
-
- auto DeletePayloadFileOnClose = [&] {
- // This will cause the file to be deleted when the last handle to it is closed
- FILE_DISPOSITION_INFO Fdi{};
- Fdi.DeleteFile = TRUE;
- BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
-
- if (!Success)
- {
- // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
- // to delete it.
- ZEN_WARN("Failed to flag temporary payload file '{}' for deletion: '{}'",
- PathFromHandle(ChunkFileHandle),
- GetLastErrorAsString());
- }
- };
-
// See if file already exists
- //
- // Future improvement: maintain Bloom filter to avoid expensive file system probes?
-
{
CAtlFile PayloadFile;
@@ -184,20 +209,33 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
// and should contain the content we were about to insert
// We do need to ensure the source file goes away on close, however
-
- uint64_t FileSize = 0;
- if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes))
+ size_t ChunkSize = Chunk.GetSize();
+ uint64_t FileSize = 0;
+ if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
{
- m_TotalSize.fetch_add(static_cast<int64_t>(FileSize));
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(Chunk.Size()), std::memory_order::relaxed);
+ }
+
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+ return CasStore::InsertResult{.New = IsNew};
}
else
{
- ZEN_WARN("get file size FAILED, file cas '{}'", Name.ShardedPath.ToUtf8());
+ ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
+ Name.ShardedPath.ToUtf8(),
+ ChunkSize,
+ FileSize);
}
-
- DeletePayloadFileOnClose();
-
- return CasStore::InsertResult{.New = false};
}
else
{
@@ -240,7 +278,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
auto $ = MakeGuard([&] { Memory::Free(RenameInfo); });
// Try to move file into place
-
BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
if (!Success)
@@ -286,31 +323,53 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
if (Success)
{
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
- return CasStore::InsertResult{.New = true};
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+
+ return CasStore::InsertResult{.New = IsNew};
}
const DWORD LastError = GetLastError();
if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
{
- DeletePayloadFileOnClose();
+ HashLock.ReleaseNow();
+ DeletePayloadFileOnClose(ChunkFileHandle);
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
- return CasStore::InsertResult{.New = false};
+ return CasStore::InsertResult{.New = IsNew};
}
ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
GetSystemErrorAsString(LastError),
ChunkHash);
- DeletePayloadFileOnClose();
+ DeletePayloadFileOnClose(ChunkFileHandle);
#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
- std::filesystem::path DestPath = Name.ShardedPath.c_str();
- int Ret = link(SourcePath.c_str(), DestPath.c_str());
+ std::filesystem::path DestPath = Name.ShardedPath.c_str();
+ int Ret = link(SourcePath.c_str(), DestPath.c_str());
if (Ret < 0 && zen::GetLastError() == ENOENT)
{
// Destination directory doesn't exist. Create it any try again.
@@ -319,14 +378,14 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
}
int LinkError = zen::GetLastError();
- // Unlink the file. If the path to unlink didn't exist someone else
- // beat us to it and that is hunky-dory.
if (unlink(SourcePath.c_str()) < 0)
{
int UnlinkError = zen::GetLastError();
if (UnlinkError != ENOENT)
{
- ZEN_WARN("unlink of CAS payload file failed ('{}')", GetSystemErrorAsString(UnlinkError));
+ ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
+ SourcePath.string(),
+ GetSystemErrorAsString(UnlinkError));
}
}
@@ -336,7 +395,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
{
if (LinkError == EEXIST)
{
- return CasStore::InsertResult{.New = false};
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
}
ZEN_WARN("link of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
@@ -345,7 +414,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
}
else
{
- return CasStore::InsertResult{.New = true};
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
}
#endif // ZEN_PLATFORM_*
}
@@ -361,8 +440,6 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
// See if file already exists
- //
- // Future improvement: maintain Bloom filter to avoid expensive file system probes?
#if ZEN_PLATFORM_WINDOWS
CAtlFile PayloadFile;
@@ -374,8 +451,16 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
// If we succeeded in opening the file then we don't need to do anything else because it already exists and should contain the
// content we were about to insert
- m_TotalSize.fetch_add(static_cast<int64_t>(ChunkSize));
- return CasStore::InsertResult{.New = false};
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
}
PayloadFile.Close();
@@ -386,7 +471,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
}
#endif
- RwLock::ExclusiveLockScope _(LockForHash(ChunkHash));
+ RwLock::ExclusiveLockScope HashLock(LockForHash(ChunkHash));
#if ZEN_PLATFORM_WINDOWS
// For now, use double-checked locking to see if someone else was first
@@ -395,11 +480,31 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
if (SUCCEEDED(hRes))
{
- // If we succeeded in opening the file then we don't need to do anything
- // else because someone else managed to create the file before we did. Just return.
+ uint64_t FileSize = 0;
+ if (HRESULT hSizeRes = PayloadFile.GetSize(FileSize); SUCCEEDED(hSizeRes) && FileSize == ChunkSize)
+ {
+ // If we succeeded in opening the file then and the size is correct we don't need to do anything
+ // else because someone else managed to create the file before we did. Just return.
- m_TotalSize.fetch_add(static_cast<int64_t>(ChunkSize));
- return {.New = false};
+ HashLock.ReleaseNow();
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return CasStore::InsertResult{.New = IsNew};
+ }
+ else
+ {
+ ZEN_WARN("get file size FAILED or file size mismatch of file cas '{}'. Expected {}, found {}. Trying to overwrite",
+ Name.ShardedPath.ToUtf8(),
+ ChunkSize,
+ FileSize);
+ }
}
if ((hRes != HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) && (hRes != HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)))
@@ -441,7 +546,21 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
{
case EEXIST:
// Another thread has beat us to it so we're golden.
- return {.New = false};
+ {
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+ return {.New = IsNew};
+ }
+ break;
case ENOENT:
if (zen::CreateDirectories(std::string_view(Name.ShardedPath.c_str(), Name.Shard2len)))
@@ -455,7 +574,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
ThrowLastError(fmt::format("Failed creating shard directory '{}'", Name.ShardedPath));
default:
- ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath));
+ ThrowLastError(fmt::format("Unexpected error occurred opening shard file '{}'", Name.ShardedPath.ToUtf8()));
}
}
@@ -492,10 +611,21 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
// *after* the lock is released due to the initialization order
PayloadFile.Close();
- m_TotalSize.fetch_add(ChunkSize, std::memory_order::relaxed);
m_CasLog.Append({.Key = ChunkHash, .Size = ChunkSize});
- return {.New = true};
+ HashLock.ReleaseNow();
+
+ bool IsNew = false;
+ {
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_KnownEntries.insert(ChunkHash).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
+ }
+
+ return {.New = IsNew};
}
IoBuffer
@@ -546,7 +676,10 @@ FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
if (!Ec)
{
- m_TotalSize.fetch_sub(FileSize);
+ if (m_KnownEntries.erase(ChunkHash) == 1u)
+ {
+ m_TotalSize.fetch_sub(FileSize, std::memory_order_relaxed);
+ }
m_CasLog.Append({.Key = ChunkHash, .Flags = FileCasIndexEntry::kTombStone, .Size = FileSize});
}
}
@@ -721,6 +854,7 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0};
std::vector<IoHash> CandidateCas;
+ CandidateCas.resize(1);
uint64_t DeletedCount = 0;
uint64_t OldTotalSize = m_TotalSize.load(std::memory_order::relaxed);
@@ -737,15 +871,22 @@ FileCasStrategy::CollectGarbage(GcContext& GcCtx)
});
IterateChunks([&](const IoHash& Hash, BasicFile& Payload) {
- bool KeepThis = false;
- CandidateCas.clear();
- CandidateCas.push_back(Hash);
+ bool KeepThis = false;
+ CandidateCas[0] = Hash;
GcCtx.FilterCids(CandidateCas, [&](const IoHash& Hash) {
ZEN_UNUSED(Hash);
KeepThis = true;
});
const uint64_t FileSize = Payload.FileSize();
+ // Is this a file we did not track previously?
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (m_KnownEntries.insert(Hash).second)
+ {
+ m_TotalSize.fetch_add(FileSize, std::memory_order_relaxed);
+ }
+ }
if (!KeepThis)
{
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index de79b8b81..29c28560e 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -33,7 +33,6 @@ struct FileCasStrategy final : public GcStorage
~FileCasStrategy();
void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
- CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
CasStore::InsertResult InsertChunk(IoBuffer Chunk,
const IoHash& ChunkHash,
CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace);
@@ -46,13 +45,16 @@ struct FileCasStrategy final : public GcStorage
virtual GcStorageSize StorageSize() const override { return {.DiskSize = m_TotalSize.load(std::memory_order::relaxed)}; }
private:
- std::filesystem::path m_RootDirectory;
- RwLock m_Lock;
- RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
- spdlog::logger& m_Log;
- spdlog::logger& Log() { return m_Log; }
- std::atomic_uint64_t m_TotalSize{};
- bool m_IsInitialized = false;
+ CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
+
+ std::filesystem::path m_RootDirectory;
+ RwLock m_Lock;
+ std::unordered_set<IoHash, IoHash::Hasher> m_KnownEntries;
+ RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
+ std::atomic_uint64_t m_TotalSize{};
+ bool m_IsInitialized = false;
struct FileCasIndexEntry
{
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 0902abf4a..8aac65bb4 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -908,7 +908,9 @@ TEST_CASE("gc.full")
CHECK(ChunkHashes[8] == IoHash::HashBuffer(CasStore->FindChunk(ChunkHashes[8])));
auto FinalSize = CasStore->TotalSize();
- CHECK(InitialSize.TinySize == FinalSize.TinySize);
+
+ CHECK_LE(InitialSize.TinySize, FinalSize.TinySize);
+ CHECK_GE(InitialSize.TinySize + (1u << 28), FinalSize.TinySize);
}
#endif
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index 9f16063a0..5ef2d4694 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -96,7 +96,6 @@ struct BlockStoreFile : public RefCounted
IoBuffer GetChunk(uint64_t Offset, uint64_t Size);
void Read(void* Data, uint64_t Size, uint64_t FileOffset);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Truncate(uint64_t Size);
void Flush();
BasicFile& GetBasicFile();
void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
@@ -153,6 +152,8 @@ public:
static const char* GetBlockFileExtension();
static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);
+ inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
+
private:
std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
@@ -165,6 +166,8 @@ private:
uint64_t m_MaxBlockSize = 1u << 28;
uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;
std::filesystem::path m_BlocksBasePath;
+
+ std::atomic_uint64_t m_TotalSize{};
};
void blockstore_forcelink();