aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-30 11:51:05 +0200
committerGitHub Enterprise <[email protected]>2025-05-30 11:51:05 +0200
commit42aa2c9a8124ada33c88f5c608e4865b1ff84c64 (patch)
tree4c237a2e515f73a256ee290e8f9a113f0a2060f2 /src
parentfrequent disk space check (#407) (diff)
downloadzen-42aa2c9a8124ada33c88f5c608e4865b1ff84c64.tar.xz
zen-42aa2c9a8124ada33c88f5c608e4865b1ff84c64.zip
faster oplog validate (#408)
Improvement: Faster oplog validate to reduce GC wall time and disk I/O pressure
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp25
-rw-r--r--src/zenstore/blockstore.cpp47
-rw-r--r--src/zenstore/compactcas.cpp7
-rw-r--r--src/zenstore/filecas.cpp20
-rw-r--r--src/zenstore/include/zenstore/blockstore.h5
-rw-r--r--src/zenutil/parallelwork.cpp2
6 files changed, 86 insertions, 20 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 7d22da717..3ec4373a2 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1511,11 +1511,17 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
ValidationResult Result;
+ const size_t OpCount = OplogCount();
+
std::vector<Oid> KeyHashes;
std::vector<std::string> Keys;
std::vector<std::vector<IoHash>> Attachments;
std::vector<OplogEntryMapping> Mappings;
+ KeyHashes.reserve(OpCount);
+ Keys.reserve(OpCount);
+ Mappings.reserve(OpCount);
+
IterateOplogWithKey([&](uint32_t LSN, const Oid& Key, CbObjectView OpView) {
Result.LSNLow = Min(Result.LSNLow, LSN);
Result.LSNHigh = Max(Result.LSNHigh, LSN);
@@ -1540,20 +1546,22 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
bool HasMissingEntries = false;
for (const ChunkMapping& Chunk : Mapping.Chunks)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload)
+ if (!m_CidStore.ContainsChunk(Chunk.Hash))
{
ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); });
HasMissingEntries = true;
}
}
+
for (const ChunkMapping& Meta : Mapping.Meta)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload)
+ if (!m_CidStore.ContainsChunk(Meta.Hash))
{
ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); });
HasMissingEntries = true;
}
}
+
for (const FileMapping& File : Mapping.Files)
{
if (File.Hash == IoHash::Zero)
@@ -1565,24 +1573,23 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
HasMissingEntries = true;
}
}
- else
+ else if (!m_CidStore.ContainsChunk(File.Hash))
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload)
- {
- ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
- HasMissingEntries = true;
- }
+ ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
+ HasMissingEntries = true;
}
}
+
const std::vector<IoHash>& OpAttachments = Attachments[OpIndex];
for (const IoHash& Attachment : OpAttachments)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload)
+ if (!m_CidStore.ContainsChunk(Attachment))
{
ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); });
HasMissingEntries = true;
}
}
+
if (HasMissingEntries)
{
ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); });
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index e0f371061..86dbcc971 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -153,14 +153,28 @@ void
BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
{
ZEN_TRACE_CPU("BlockStoreFile::Write");
+#if ZEN_BUILD_DEBUG
+ if (uint64_t CachedFileSize = m_CachedFileSize.load(); CachedFileSize > 0)
+ {
+ ZEN_ASSERT(FileOffset + Size <= CachedFileSize);
+ }
+#endif // ZEN_BUILD_DEBUG
m_File.Write(Data, Size, FileOffset);
}
void
-BlockStoreFile::Flush()
+BlockStoreFile::Flush(uint64_t FinalSize)
{
ZEN_TRACE_CPU("BlockStoreFile::Flush");
m_File.Flush();
+ if (FinalSize != (uint64_t)-1)
+ {
+ uint64_t ExpectedSize = 0;
+ if (!m_CachedFileSize.compare_exchange_weak(ExpectedSize, FinalSize))
+ {
+ ZEN_ASSERT(m_CachedFileSize.load() == FinalSize);
+ }
+ }
}
BasicFile&
@@ -540,7 +554,7 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint32_t Alignment, cons
{
if (m_WriteBlock)
{
- m_WriteBlock->Flush();
+ m_WriteBlock->Flush(m_CurrentInsertOffset);
m_WriteBlock = nullptr;
}
@@ -674,6 +688,27 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
}
}
+bool
+BlockStore::HasChunk(const BlockStoreLocation& Location) const
+{
+ ZEN_TRACE_CPU("BlockStore::TryGetChunk");
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
+ {
+ if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ {
+ InsertLock.ReleaseNow();
+
+ const uint64_t BlockSize = Block->FileSize();
+ if (Location.Offset + Location.Size <= BlockSize)
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
IoBuffer
BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
{
@@ -706,7 +741,7 @@ BlockStore::Flush(bool ForceNewBlock)
{
if (m_WriteBlock)
{
- m_WriteBlock->Flush();
+ m_WriteBlock->Flush(m_CurrentInsertOffset);
}
m_WriteBlock = nullptr;
m_CurrentInsertOffset = 0;
@@ -1097,7 +1132,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
if (NewBlockFile)
{
ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
- NewBlockFile->Flush();
+ NewBlockFile->Flush(WriteOffset);
uint64_t NewBlockSize = NewBlockFile->FileSize();
MovedSize += NewBlockSize;
NewBlockFile = nullptr;
@@ -1228,7 +1263,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
if (NewBlockFile)
{
ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
- NewBlockFile->Flush();
+ NewBlockFile->Flush(WriteOffset);
uint64_t NewBlockSize = NewBlockFile->FileSize();
MovedSize += NewBlockSize;
NewBlockFile = nullptr;
@@ -1359,6 +1394,8 @@ TEST_CASE("blockstore.blockfile")
CHECK(std::string(Boop) == "boop");
File1.Flush();
CHECK(File1.FileSize() == 10);
+ File1.Flush(10);
+ CHECK(File1.FileSize() == 10);
}
{
BlockStoreFile File1(RootDirectory / "1");
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 15bea272b..730bdf143 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -307,7 +307,12 @@ bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
RwLock::SharedLockScope _(m_LocationMapLock);
- return m_LocationMap.contains(ChunkHash);
+ if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
+ {
+ const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ return m_BlockStore.HasChunk(Location);
+ }
+ return false;
}
void
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 6354edf70..56979f267 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -564,8 +564,24 @@ FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
ZEN_ASSERT(m_IsInitialized);
- RwLock::SharedLockScope _(m_Lock);
- return m_Index.contains(ChunkHash);
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It == m_Index.end())
+ {
+ return false;
+ }
+ }
+
+ ShardingHelper Name(m_RootDirectory, ChunkHash);
+ const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath();
+ RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash));
+
+ if (IsFile(ChunkPath))
+ {
+ return true;
+ }
+
+ return false;
}
void
diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h
index 0c72a13aa..b0713fea1 100644
--- a/src/zenstore/include/zenstore/blockstore.h
+++ b/src/zenstore/include/zenstore/blockstore.h
@@ -94,7 +94,7 @@ struct BlockStoreFile : public RefCounted
IoBuffer GetChunk(uint64_t Offset, uint64_t Size);
void Read(void* Data, uint64_t Size, uint64_t FileOffset);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Flush();
+ void Flush(uint64_t FinalSize = (uint64_t)-1);
BasicFile& GetBasicFile();
void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
bool IsOpen() const;
@@ -107,7 +107,7 @@ private:
const std::filesystem::path m_Path;
IoBuffer m_IoBuffer;
BasicFile m_File;
- uint64_t m_CachedFileSize = 0;
+ std::atomic<uint64_t> m_CachedFileSize = 0;
};
class BlockStoreCompactState;
@@ -158,6 +158,7 @@ public:
typedef std::function<void(std::span<BlockStoreLocation> Locations)> WriteChunksCallback;
void WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, const WriteChunksCallback& Callback);
+ bool HasChunk(const BlockStoreLocation& Location) const;
IoBuffer TryGetChunk(const BlockStoreLocation& Location) const;
void Flush(bool ForceNewBlock);
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index 91591375a..ecacc4b5a 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -85,7 +85,7 @@ ParallelWork::RethrowErrors()
{
if (m_Errors.size() > 1)
{
- ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:");
+ ZEN_INFO("Multiple exceptions thrown during ParallelWork execution, dropping the following exceptions:");
auto It = m_Errors.begin() + 1;
while (It != m_Errors.end())
{