aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp11
-rw-r--r--src/zenstore/compactcas.cpp58
2 files changed, 39 insertions, 30 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index f99b0bc4a..968e919d6 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -171,6 +171,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
if (std::filesystem::is_directory(m_BlocksBasePath))
{
+ uint32_t NextBlockIndex = 0;
std::vector<std::filesystem::path> FoldersToScan;
FoldersToScan.push_back(m_BlocksBasePath);
size_t FolderOffset = 0;
@@ -202,10 +203,15 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max
m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed);
m_ChunkBlocks[BlockIndex] = BlockFile;
FoundBlocks[BlockIndex] = BlockFile->FileSize();
+ if (BlockIndex >= NextBlockIndex)
+ {
+ NextBlockIndex = (BlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
}
}
++FolderOffset;
}
+ m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release);
}
else
{
@@ -363,14 +369,11 @@ BlockStore::Flush(bool ForceNewBlock)
RwLock::ExclusiveLockScope _(m_InsertLock);
if (m_CurrentInsertOffset > 0)
{
- uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
- WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
if (m_WriteBlock)
{
m_WriteBlock->Flush();
}
- m_WriteBlock = nullptr;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_WriteBlock = nullptr;
m_CurrentInsertOffset = 0;
}
return;
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index e6383c3a1..715704c2e 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -436,10 +436,10 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
RwLock::SharedLockScope ___(m_LocationMapLock);
Stopwatch Timer;
- const auto ____ = MakeGuard([&Timer, &WriteBlockTimeUs, &WriteBlockLongestTimeUs] {
+ const auto ____ = MakeGuard([&Timer, &ReadBlockTimeUs, &ReadBlockLongestTimeUs] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- WriteBlockTimeUs += ElapsedUs;
- WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
});
LocationMap = m_LocationMap;
Locations = m_Locations;
@@ -496,41 +496,47 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
[&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& RemovedChunks) {
std::vector<CasDiskIndexEntry> LogEntries;
LogEntries.reserve(MovedChunks.size() + RemovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- LogEntries.push_back({.Key = ChunkHash, .Location = {NewLocation, m_PayloadAlignment}});
- }
- for (const size_t ChunkIndex : RemovedChunks)
- {
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const BlockStoreDiskLocation& OldDiskLocation = Locations[LocationMap[ChunkHash]];
- LogEntries.push_back({.Key = ChunkHash, .Location = OldDiskLocation, .Flags = CasDiskIndexEntry::kTombstone});
- DeletedChunks.push_back(ChunkHash);
- }
-
- m_CasLog.Append(LogEntries);
- m_CasLog.Flush();
{
RwLock::ExclusiveLockScope __(m_LocationMapLock);
Stopwatch Timer;
const auto ____ = MakeGuard([&] {
uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
- ReadBlockTimeUs += ElapsedUs;
- ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
- for (const CasDiskIndexEntry& Entry : LogEntries)
+ for (const auto& Entry : MovedChunks)
{
- if (Entry.Flags & CasDiskIndexEntry::kTombstone)
+ size_t ChunkIndex = Entry.first;
+ const BlockStoreLocation& NewLocation = Entry.second;
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t LocationIndex = m_LocationMap[ChunkHash];
+ BlockStoreDiskLocation& Location = m_Locations[LocationIndex];
+ if (Locations[LocationMap[ChunkHash]] != Location)
{
- m_LocationMap.erase(Entry.Key);
+ // Entry has been updated while GC was running, ignore the move
continue;
}
- m_Locations[m_LocationMap[Entry.Key]] = Entry.Location;
+ Location = {NewLocation, m_PayloadAlignment};
+ LogEntries.push_back({.Key = ChunkHash, .Location = Location});
+ }
+ for (const size_t ChunkIndex : RemovedChunks)
+ {
+ const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
+ size_t LocationIndex = m_LocationMap[ChunkHash];
+ const BlockStoreDiskLocation& Location = Locations[LocationIndex];
+ if (Locations[LocationMap[ChunkHash]] != Location)
+ {
+ // Entry has been updated while GC was running, ignore the delete
+ continue;
+ }
+ LogEntries.push_back({.Key = ChunkHash, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone});
+ m_LocationMap.erase(ChunkHash);
+ DeletedChunks.push_back(ChunkHash);
}
}
+
+ m_CasLog.Append(LogEntries);
+ m_CasLog.Flush();
},
[&GcCtx]() { return GcCtx.CollectSmallObjects(); });