aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2026-03-09 18:25:30 -0700
committerLiam Mitchell <[email protected]>2026-03-09 18:25:30 -0700
commit57c1683b2935c834250b73eb506319ed67946160 (patch)
tree1fc8f237010b26e65659b731fe6f6eae30422f5c /src/zenstore
parentAllow external OidcToken executable to be specified unless disabled via comma... (diff)
parentreduce lock time for project store gc precache and gc validate (#750) (diff)
downloadzen-57c1683b2935c834250b73eb506319ed67946160.tar.xz
zen-57c1683b2935c834250b73eb506319ed67946160.zip
Merge branch 'main' into lm/oidctoken-exe-path
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp28
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp142
-rw-r--r--src/zenstore/cache/cacherpc.cpp19
-rw-r--r--src/zenstore/cas.h2
-rw-r--r--src/zenstore/compactcas.cpp37
-rw-r--r--src/zenstore/include/zenstore/caslog.h4
-rw-r--r--src/zenstore/include/zenstore/projectstore.h10
-rw-r--r--src/zenstore/projectstore.cpp301
8 files changed, 402 insertions, 141 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index f97c98e08..3ea91ead6 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -374,7 +374,7 @@ BlockStoreFile::GetMetaPath() const
////////////////////////////////////////////////////////
-constexpr uint64_t DefaultIterateSmallChunkWindowSize = 2 * 1024 * 1024;
+constexpr uint64_t DefaultIterateSmallChunkWindowSize = 512u * 1024u;
BlockStore::BlockStore()
{
@@ -762,7 +762,7 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
LargestSize = Max(LargestSize, Size);
}
- const uint64_t MinSize = Max(LargestSize, 8u * 1024u * 1024u);
+ const uint64_t MinSize = Max(LargestSize, 512u * 1024u);
const uint64_t BufferSize = Min(TotalSize, MinSize);
std::vector<uint8_t> Buffer(BufferSize);
@@ -815,7 +815,12 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
auto _ = MakeGuard([this, WriteBlockIndex]() { RemoveActiveWriteBlock(WriteBlockIndex); });
+ if (Count > 1)
{
+ if (Buffer.empty())
+ {
+ Buffer.resize(BufferSize);
+ }
MutableMemoryView WriteBuffer(Buffer.data(), RangeSize);
for (size_t Index = 0; Index < Count; Index++)
{
@@ -824,9 +829,14 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
WriteBuffer.MidInline(RoundUp(SourceBuffer.GetSize(), Alignment));
}
WriteBlock->Write(Buffer.data(), RangeSize, AlignedInsertOffset);
+ m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
+ }
+ else
+ {
+ MemoryView SourceBuffer = Datas[Offset];
+ WriteBlock->Write(SourceBuffer.GetData(), SourceBuffer.GetSize(), AlignedInsertOffset);
+ m_TotalSize.fetch_add(SourceBuffer.GetSize(), std::memory_order::relaxed);
}
-
- m_TotalSize.fetch_add(RangeSize, std::memory_order::relaxed);
uint32_t ChunkOffset = AlignedInsertOffset;
std::vector<BlockStoreLocation> Locations(Count);
@@ -845,11 +855,11 @@ BlockStore::WriteChunks(std::span<const IoBuffer> Datas, uint32_t Alignment, con
bool
BlockStore::HasChunk(const BlockStoreLocation& Location) const
{
- ZEN_TRACE_CPU("BlockStore::TryGetChunk");
+ ZEN_TRACE_CPU("BlockStore::HasChunk");
RwLock::SharedLockScope InsertLock(m_InsertLock);
if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
{
- if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ if (Ref<BlockStoreFile> Block = BlockIt->second; Block)
{
InsertLock.ReleaseNow();
@@ -878,8 +888,10 @@ BlockStore::TryGetChunk(const BlockStoreLocation& Location) const
RwLock::SharedLockScope InsertLock(m_InsertLock);
if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
{
- if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block)
+ if (Ref<BlockStoreFile> Block = BlockIt->second; Block)
{
+ InsertLock.ReleaseNow();
+
IoBuffer Chunk = Block->GetChunk(Location.Offset, Location.Size);
if (Chunk.GetSize() == Location.Size)
{
@@ -941,7 +953,7 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations,
uint64_t IterateSmallChunkWindowSize = Max(DefaultIterateSmallChunkWindowSize, LargeSizeLimit);
- const uint64_t IterateSmallChunkMaxGapSize = Max(2048u, IterateSmallChunkWindowSize / 512u);
+ const uint64_t IterateSmallChunkMaxGapSize = Max(2048u, IterateSmallChunkWindowSize / 256u);
IterateSmallChunkWindowSize = Min((LargeSizeLimit + IterateSmallChunkMaxGapSize) * ChunkLocations.size(), IterateSmallChunkWindowSize);
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index b2e045632..ead7e4f3a 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -2410,74 +2410,95 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
try
{
- std::vector<BlockStoreLocation> ChunkLocations;
- std::vector<IoHash> ChunkIndexToChunkHash;
+ std::vector<DiskLocation> ChunkLocations;
+ std::vector<IoHash> ChunkIndexToChunkHash;
+ std::vector<DiskLocation> StandaloneLocations;
+ std::vector<IoHash> StandaloneIndexToKeysHash;
- RwLock::SharedLockScope _(m_IndexLock);
+ {
+ RwLock::SharedLockScope _(m_IndexLock);
- const size_t BlockChunkInitialCount = m_Index.size() / 4;
- ChunkLocations.reserve(BlockChunkInitialCount);
- ChunkIndexToChunkHash.reserve(BlockChunkInitialCount);
+ const size_t InitialCount = m_Index.size() / 4;
+ ChunkLocations.reserve(InitialCount);
+ ChunkIndexToChunkHash.reserve(InitialCount);
+ StandaloneLocations.reserve(InitialCount);
+ StandaloneIndexToKeysHash.reserve(InitialCount);
- // Do a pass over the index and verify any standalone file values straight away
- // all other storage classes are gathered and verified in bulk in order to enable
- // more efficient I/O scheduling
+ for (auto& Kv : m_Index)
+ {
+ const IoHash& HashKey = Kv.first;
+ const BucketPayload& Payload = m_Payloads[Kv.second];
+ const DiskLocation& Loc = Payload.Location;
- for (auto& Kv : m_Index)
- {
- const IoHash& HashKey = Kv.first;
- const BucketPayload& Payload = m_Payloads[Kv.second];
- const DiskLocation& Loc = Payload.Location;
+ Ctx.ThrowIfDeadlineExpired();
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneLocations.push_back(Loc);
+ StandaloneIndexToKeysHash.push_back(HashKey);
+ }
+ else
+ {
+ ChunkLocations.push_back(Loc);
+ ChunkIndexToChunkHash.push_back(HashKey);
+ }
+ }
+ }
+
+ for (size_t StandaloneKeyIndex = 0; StandaloneKeyIndex < StandaloneIndexToKeysHash.size(); StandaloneKeyIndex++)
+ {
Ctx.ThrowIfDeadlineExpired();
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- ChunkCount.fetch_add(1);
- VerifiedChunkBytes.fetch_add(Loc.Size());
+ const IoHash& HashKey = StandaloneIndexToKeysHash[StandaloneKeyIndex];
+ const DiskLocation& Loc = StandaloneLocations[StandaloneKeyIndex];
- if (Loc.GetContentType() == ZenContentType::kBinary)
- {
- // Blob cache value, not much we can do about data integrity checking
- // here since there's no hash available
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
+ ChunkCount.fetch_add(1);
+ VerifiedChunkBytes.fetch_add(Loc.Size());
- RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+ if (Loc.GetContentType() == ZenContentType::kBinary)
+ {
+ // Blob cache value, not much we can do about data integrity checking
+ // here since there's no hash available
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
- std::error_code Ec;
- uintmax_t size = FileSizeFromPath(DataFilePath.ToPath(), Ec);
- if (Ec)
- {
- ReportBadKey(HashKey);
- }
- if (size != Loc.Size())
- {
- ReportBadKey(HashKey);
- }
- continue;
+ RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
+
+ std::error_code Ec;
+ uintmax_t Size = FileSizeFromPath(DataFilePath.ToPath(), Ec);
+ if (Ec)
+ {
+ ReportBadKey(HashKey);
}
- else
+ ValueLock.ReleaseNow();
+
+ if (Size != Loc.Size())
{
- // Structured cache value
- IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
- if (!Buffer)
+ // Make sure we verify that size hasn't changed behind our back...
+ RwLock::SharedLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
- ReportBadKey(HashKey);
- continue;
- }
- if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer)))
- {
- ReportBadKey(HashKey);
- continue;
+ const BucketPayload& Payload = m_Payloads[It->second];
+ const DiskLocation& CurrentLoc = Payload.Location;
+ if (Size != CurrentLoc.Size())
+ {
+ ReportBadKey(HashKey);
+ }
}
}
}
else
{
- ChunkLocations.emplace_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment));
- ChunkIndexToChunkHash.push_back(HashKey);
- continue;
+ // Structured cache value
+ IoBuffer Buffer = GetStandaloneCacheValue(Loc, HashKey);
+ if (!Buffer)
+ {
+ ReportBadKey(HashKey);
+ }
+ else if (!ValidateIoBuffer(Loc.GetContentType(), std::move(Buffer)))
+ {
+ ReportBadKey(HashKey);
+ }
}
}
@@ -2502,8 +2523,9 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ReportBadKey(Hash);
return true;
}
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
+
+ const DiskLocation& Loc = ChunkLocations[ChunkIndex];
+ ZenContentType ContentType = Loc.GetContentType();
Buffer.SetContentType(ContentType);
if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
@@ -2525,8 +2547,8 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
ReportBadKey(Hash);
return true;
}
- const BucketPayload& Payload = m_Payloads[m_Index.at(Hash)];
- ZenContentType ContentType = Payload.Location.GetContentType();
+ const DiskLocation& Loc = ChunkLocations[ChunkIndex];
+ ZenContentType ContentType = Loc.GetContentType();
Buffer.SetContentType(ContentType);
if (!ValidateIoBuffer(ContentType, std::move(Buffer)))
{
@@ -2536,8 +2558,16 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx)
return true;
};
- m_BlockStore.IterateChunks(ChunkLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
- return m_BlockStore.IterateBlock(ChunkLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
+ std::vector<BlockStoreLocation> ChunkBlockLocations;
+ ChunkBlockLocations.reserve(ChunkLocations.size());
+
+ for (const DiskLocation& Loc : ChunkLocations)
+ {
+ ChunkBlockLocations.push_back(Loc.GetBlockLocation(m_Configuration.PayloadAlignment));
+ }
+
+ m_BlockStore.IterateChunks(ChunkBlockLocations, [&](uint32_t, std::span<const size_t> ChunkIndexes) {
+ return m_BlockStore.IterateBlock(ChunkBlockLocations, ChunkIndexes, ValidateSmallChunk, ValidateLargeChunk, 0);
});
}
catch (ScrubDeadlineExpiredException&)
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 660c66b9a..94abcf547 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -594,16 +594,16 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
{
FoundLocalInvalid = true;
}
- else if (CbValidateError Error = ValidateCompactBinary(Request.RecordCacheValue.GetView(), CbValidateMode::Default);
- Error != CbValidateError::None)
+ else if (CbObjectView RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
+ RecordObject.GetSize() != Request.RecordCacheValue.GetSize())
{
ZEN_WARN("HandleRpcGetCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'",
- ToString(Error));
+ "Object size does not match payload size");
FoundLocalInvalid = true;
}
else
{
- Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
+ Request.RecordObject = std::move(RecordObject);
ParseValues(Request);
Request.Complete = true;
@@ -1710,16 +1710,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
Record.ValuesRead = true;
if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject)
{
- if (CbValidateError Error = ValidateCompactBinary(Record.CacheValue.GetView(), CbValidateMode::Default);
- Error != CbValidateError::None)
+ if (CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
+ RecordObject.GetSize() != Record.CacheValue.GetSize())
{
- ZEN_WARN("GetLocalCacheRecords stored record for is corrupt, compact binary format validation failed. Reason: '{}'",
- ToString(Error));
+ ZEN_WARN("GetLocalCacheRecords stored record is corrupt, compact binary format validation failed. Reason: '{}'",
+ "Object size does not match payload size");
}
else
{
- CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData());
- CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
Record.Values.reserve(ValuesArray.Num());
for (CbFieldView ValueField : ValuesArray)
{
diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h
index 0f6e2ba9d..47b6e63cc 100644
--- a/src/zenstore/cas.h
+++ b/src/zenstore/cas.h
@@ -59,7 +59,7 @@ protected:
CidStoreConfiguration m_Config;
};
-ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
+std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc);
void CAS_forcelink();
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index a5de5c448..5d8f95c9e 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -301,13 +301,14 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
ZEN_TRACE_CPU("CasContainer::FindChunk");
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope Lock(m_LocationMapLock);
auto KeyIt = m_LocationMap.find(ChunkHash);
if (KeyIt == m_LocationMap.end())
{
return IoBuffer();
}
- const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ Lock.ReleaseNow();
IoBuffer Chunk = m_BlockStore.TryGetChunk(Location);
return Chunk;
@@ -316,10 +317,11 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
bool
CasContainerStrategy::HaveChunk(const IoHash& ChunkHash)
{
- RwLock::SharedLockScope _(m_LocationMapLock);
+ RwLock::SharedLockScope Lock(m_LocationMapLock);
if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end())
{
- const BlockStoreLocation& Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ const BlockStoreLocation Location = m_Locations[KeyIt->second].Get(m_PayloadAlignment);
+ Lock.ReleaseNow();
return m_BlockStore.HasChunk(Location);
}
return false;
@@ -545,11 +547,11 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
if (Ctx.IsSkipCas())
{
- ZEN_INFO("SKIPPED scrubbing: '{}'", m_BlocksBasePath);
+ ZEN_INFO("SKIPPED scrubbing: '{}'", m_RootDirectory);
return;
}
- ZEN_INFO("scrubbing '{}'", m_BlocksBasePath);
+ ZEN_INFO("scrubbing '{}'", m_RootDirectory);
RwLock BadKeysLock;
std::vector<IoHash> BadKeys;
@@ -563,20 +565,21 @@ CasContainerStrategy::ScrubStorage(ScrubContext& Ctx)
try
{
- RwLock::SharedLockScope _(m_LocationMapLock);
-
- uint64_t TotalChunkCount = m_LocationMap.size();
- ChunkLocations.reserve(TotalChunkCount);
- ChunkIndexToChunkHash.reserve(TotalChunkCount);
{
- for (const auto& Entry : m_LocationMap)
+ uint64_t TotalChunkCount = m_LocationMap.size();
+ ChunkLocations.reserve(TotalChunkCount);
+ ChunkIndexToChunkHash.reserve(TotalChunkCount);
+ RwLock::SharedLockScope _(m_LocationMapLock);
{
- const IoHash& ChunkHash = Entry.first;
- const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second];
- BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
+ for (const auto& Entry : m_LocationMap)
+ {
+ const IoHash& ChunkHash = Entry.first;
+ const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second];
+ BlockStoreLocation Location = DiskLocation.Get(m_PayloadAlignment);
- ChunkLocations.push_back(Location);
- ChunkIndexToChunkHash.push_back(ChunkHash);
+ ChunkLocations.push_back(Location);
+ ChunkIndexToChunkHash.push_back(ChunkHash);
+ }
}
}
diff --git a/src/zenstore/include/zenstore/caslog.h b/src/zenstore/include/zenstore/caslog.h
index 3d95c9c90..f3dd32fb1 100644
--- a/src/zenstore/include/zenstore/caslog.h
+++ b/src/zenstore/include/zenstore/caslog.h
@@ -41,8 +41,8 @@ private:
static const inline uint8_t MagicSequence[16] = {'.', '-', '=', ' ', 'C', 'A', 'S', 'L', 'O', 'G', 'v', '1', ' ', '=', '-', '.'};
- ZENCORE_API uint32_t ComputeChecksum();
- void Finalize() { Checksum = ComputeChecksum(); }
+ uint32_t ComputeChecksum();
+ void Finalize() { Checksum = ComputeChecksum(); }
};
static_assert(sizeof(FileHeader) == 64);
diff --git a/src/zenstore/include/zenstore/projectstore.h b/src/zenstore/include/zenstore/projectstore.h
index 09c3096ad..33ef996db 100644
--- a/src/zenstore/include/zenstore/projectstore.h
+++ b/src/zenstore/include/zenstore/projectstore.h
@@ -238,6 +238,16 @@ public:
std::atomic_bool& IsCancelledFlag,
WorkerThreadPool* OptionalWorkerPool);
+ struct OplogSnapshot
+ {
+ std::vector<CbObjectView> Ops;
+ std::vector<Oid> Keys;
+ std::vector<LogSequenceNumber> LSNs;
+ std::vector<IoBuffer> PayloadBuffers;
+ };
+
+ OplogSnapshot GetSnapshotLocked();
+
private:
struct FileMapEntry
{
diff --git a/src/zenstore/projectstore.cpp b/src/zenstore/projectstore.cpp
index f1001f665..1ab2b317a 100644
--- a/src/zenstore/projectstore.cpp
+++ b/src/zenstore/projectstore.cpp
@@ -22,6 +22,8 @@
#include "referencemetadata.h"
+#include <numeric>
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
#include <xxh3.h>
@@ -861,9 +863,9 @@ struct ProjectStore::OplogStorage : public RefCounted
}
}
- void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
- const std::span<const Oplog::PayloadIndex> Order,
- std::function<void(LogSequenceNumber Lsn, const IoBuffer& Buffer)>&& Handler)
+ void ReplayLogEntries(const std::span<const Oplog::OplogPayload> Entries,
+ const std::span<const Oplog::PayloadIndex> Order,
+ std::function<void(LogSequenceNumber Lsn, IoBuffer&& Buffer)>&& Handler)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("Store::OplogStorage::ReplayLogEntries");
@@ -886,13 +888,13 @@ struct ProjectStore::OplogStorage : public RefCounted
if (OpBufferView.GetSize() == Entry.Address.Size)
{
IoBuffer Buffer = IoBuffer(IoBuffer::Wrap, OpBufferView.GetData(), OpBufferView.GetSize());
- Handler(Entry.Lsn, Buffer);
+ Handler(Entry.Lsn, std::move(Buffer));
}
else
{
IoBuffer OpBuffer(Entry.Address.Size);
OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Address.Size, OpFileOffset);
- Handler(Entry.Lsn, OpBuffer);
+ Handler(Entry.Lsn, std::move(OpBuffer));
}
}
}
@@ -1645,18 +1647,47 @@ ProjectStore::Oplog::Validate(const std::filesystem::path& ProjectRootDir,
Keys.reserve(OpCount);
Mappings.reserve(OpCount);
- IterateOplogWithKey([&](LogSequenceNumber LSN, const Oid& Key, CbObjectView OpView) {
- Result.LSNLow = Min(Result.LSNLow, LSN);
- Result.LSNHigh = Max(Result.LSNHigh, LSN);
- KeyHashes.push_back(Key);
- Keys.emplace_back(std::string(OpView["key"sv].AsString()));
+ {
+ Stopwatch SnapshotTimer;
+ RwLock::SharedLockScope OplogLock(m_OplogLock);
+ ProjectStore::Oplog::OplogSnapshot Snapshot = GetSnapshotLocked();
+ OplogLock.ReleaseNow();
- std::vector<IoHash> OpAttachments;
- OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); });
- Attachments.emplace_back(std::move(OpAttachments));
+ uint64_t AllocatedSize = std::accumulate(Snapshot.PayloadBuffers.begin(),
+ Snapshot.PayloadBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); });
+ uint64_t UsedSize =
+ std::accumulate(Snapshot.Ops.begin(), Snapshot.Ops.end(), uint64_t(0), [](uint64_t Current, const CbObjectView& Object) {
+ return Current + Object.GetSize();
+ });
- Mappings.push_back(GetMapping(OpView));
- });
+ ZEN_INFO("Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' in {}",
+ Snapshot.Ops.size(),
+ NiceBytes(UsedSize),
+ NiceBytes(AllocatedSize),
+ m_OuterProjectId,
+ m_OplogId,
+ NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs()));
+
+ for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++)
+ {
+ CbObjectView& OpView = Snapshot.Ops[Index];
+ LogSequenceNumber LSN = Snapshot.LSNs[Index];
+ const Oid& Key = Snapshot.Keys[Index];
+
+ Result.LSNLow = Min(Result.LSNLow, LSN);
+ Result.LSNHigh = Max(Result.LSNHigh, LSN);
+ KeyHashes.push_back(Key);
+ Keys.emplace_back(std::string(OpView["key"sv].AsString()));
+
+ std::vector<IoHash> OpAttachments;
+ OpView.IterateAttachments([&OpAttachments](CbFieldView Attachment) { OpAttachments.push_back(Attachment.AsAttachment()); });
+ Attachments.emplace_back(std::move(OpAttachments));
+
+ Mappings.push_back(GetMapping(OpView));
+ }
+ }
Result.OpCount = gsl::narrow<uint32_t>(Keys.size());
@@ -2644,6 +2675,104 @@ ProjectStore::Oplog::GetSortedOpPayloadRangeLocked(const Paging& Entry
return ReplayOrder;
}
+ProjectStore::Oplog::OplogSnapshot
+ProjectStore::Oplog::GetSnapshotLocked()
+{
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ ZEN_TRACE_CPU("Store::Oplog::GetSnapshotLocked");
+ if (!m_Storage)
+ {
+ return {};
+ }
+
+ uint64_t WriteOffset = 0;
+ OplogSnapshot Snapshot;
+
+ const uint64_t PayloadBufferPageSize = 64u * 1024u;
+
+ size_t OpCount = GetOplogEntryCount();
+ Snapshot.Ops.reserve(OpCount);
+ Snapshot.Keys.reserve(OpCount);
+ Snapshot.LSNs.reserve(OpCount);
+
+ tsl::robin_map<PayloadIndex, Oid, PayloadIndex::Hasher> ReverseKeyMap;
+ std::vector<PayloadIndex> ReplayOrder = GetSortedOpPayloadRangeLocked(Paging{}, &ReverseKeyMap);
+ if (!ReplayOrder.empty())
+ {
+ uint32_t EntryIndex = 0;
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber LSN, IoBuffer&& Buffer) {
+ const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
+
+ Snapshot.Keys.push_back(ReverseKeyMap.at(PayloadOffset));
+ Snapshot.LSNs.push_back(LSN);
+
+ uint64_t Size = Buffer.GetSize();
+ if (Buffer.IsOwned())
+ {
+ CbObjectView CopyView(Buffer.GetData());
+ if (CopyView.GetSize() == Size)
+ {
+ Snapshot.Ops.emplace_back(std::move(CopyView));
+ }
+ else
+ {
+ Snapshot.Ops.emplace_back(CbObjectView{});
+ }
+ if (Snapshot.PayloadBuffers.empty())
+ {
+ Snapshot.PayloadBuffers.emplace_back(std::move(Buffer));
+ WriteOffset = Snapshot.PayloadBuffers.back().Size();
+ }
+ else
+ {
+ Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(Buffer));
+ }
+ }
+ else
+ {
+ uint64_t AvailableSize = Snapshot.PayloadBuffers.empty() ? 0 : Snapshot.PayloadBuffers.back().GetSize() - WriteOffset;
+ MutableMemoryView WriteBuffer;
+ if (Size > AvailableSize)
+ {
+ if (Size >= PayloadBufferPageSize && !Snapshot.PayloadBuffers.empty())
+ {
+ // Insert the large payload before the current payload buffer so we can continue to use that
+ IoBuffer PayloadBuffer(Size);
+ WriteBuffer = PayloadBuffer.GetMutableView();
+ Snapshot.PayloadBuffers.insert(Snapshot.PayloadBuffers.end() - 1, std::move(PayloadBuffer));
+ }
+ else
+ {
+ IoBuffer PayloadBuffer(Max(Size, PayloadBufferPageSize));
+ WriteBuffer = PayloadBuffer.GetMutableView().Mid(0, Size);
+ Snapshot.PayloadBuffers.emplace_back(std::move(PayloadBuffer));
+ WriteOffset = Size;
+ }
+ }
+ else
+ {
+ WriteBuffer = Snapshot.PayloadBuffers.back().GetMutableView().Mid(WriteOffset, Size);
+ WriteOffset += Size;
+ }
+ WriteBuffer.CopyFrom(Buffer.GetView());
+ CbObjectView CopyView(WriteBuffer.GetData());
+
+ if (CopyView.GetSize() == Size)
+ {
+ Snapshot.Ops.emplace_back(std::move(CopyView));
+ }
+ else
+ {
+ Snapshot.Ops.emplace_back(CbObjectView{});
+ }
+ }
+
+ EntryIndex++;
+ });
+ }
+ return Snapshot;
+}
+
void
ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler, const Paging& EntryPaging)
{
@@ -2710,7 +2839,7 @@ ProjectStore::Oplog::IterateOplogWithKeyRaw(std::function<void(LogSequenceNumber
if (!ReplayOrder.empty())
{
uint32_t EntryIndex = 0;
- m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, const IoBuffer& Buffer) {
+ m_Storage->ReplayLogEntries(m_OpLogPayloads, ReplayOrder, [&](LogSequenceNumber Lsn, IoBuffer&& Buffer) {
const PayloadIndex PayloadOffset = ReplayOrder[EntryIndex];
Handler(Lsn, ReverseKeyMap.at(PayloadOffset), Buffer);
EntryIndex++;
@@ -3917,7 +4046,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
- ZEN_INFO("scrubbing '{}'", ProjectRootDir);
+ ZEN_INFO("scrubbing '{}'", m_OplogStoragePath);
// Scrubbing needs to check all existing oplogs
std::vector<std::string> OpLogs = ScanForOplogs();
@@ -3932,6 +4061,7 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
for (const std::string& OpLogId : OpLogs)
{
+ Ctx.ThrowIfDeadlineExpired();
Ref<ProjectStore::Oplog> OpLog;
{
if (auto OpIt = m_Oplogs.find(OpLogId); OpIt != m_Oplogs.end())
@@ -4358,6 +4488,7 @@ ProjectStore::ScrubStorage(ScrubContext& Ctx)
}
for (const Ref<Project>& Project : Projects)
{
+ Ctx.ThrowIfDeadlineExpired();
Project->Scrub(Ctx);
}
}
@@ -6033,39 +6164,41 @@ public:
{
Ref<ProjectStore::Oplog> Oplog;
- RwLock::SharedLockScope __(m_Project->m_ProjectLock);
- if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
{
- Oplog = It->second;
- Oplog->EnableUpdateCapture();
- m_OplogHasUpdateCapture = true;
- }
- else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath))
- {
- Stopwatch OplogTimer;
- Oplog = new ProjectStore::Oplog(m_Project->Log(),
- m_Project->Identifier,
- m_OplogId,
- m_Project->m_CidStore,
- m_OplogBasePath,
- std::filesystem::path{},
- ProjectStore::Oplog::EMode::kBasicReadOnly);
- Oplog->Read();
- if (Ctx.Settings.Verbose)
+ RwLock::SharedLockScope __(m_Project->m_ProjectLock);
+ if (auto It = m_Project->m_Oplogs.find(m_OplogId); It != m_Project->m_Oplogs.end())
{
- ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}",
- m_OplogBasePath,
- m_Project->Identifier,
- m_OplogId,
- NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ Oplog = It->second;
+ Oplog->EnableUpdateCapture();
+ m_OplogHasUpdateCapture = true;
+ }
+ else if (ProjectStore::Oplog::ExistsAt(m_OplogBasePath))
+ {
+ Stopwatch OplogTimer;
+ Oplog = new ProjectStore::Oplog(m_Project->Log(),
+ m_Project->Identifier,
+ m_OplogId,
+ m_Project->m_CidStore,
+ m_OplogBasePath,
+ std::filesystem::path{},
+ ProjectStore::Oplog::EMode::kBasicReadOnly);
+ Oplog->Read();
+ if (Ctx.Settings.Verbose)
+ {
+ ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': read oplog '{}/{}' in {}",
+ m_OplogBasePath,
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(OplogTimer.GetElapsedTimeMs()));
+ }
+ }
+ else
+ {
+ return;
}
- }
- else
- {
- return;
}
- RwLock::SharedLockScope ___(Oplog->m_OplogLock);
+ RwLock::SharedLockScope OplogLock(Oplog->m_OplogLock);
if (Ctx.IsCancelledFlag)
{
return;
@@ -6081,7 +6214,45 @@ public:
}
}
- Oplog->GetAttachmentsLocked(m_References, Ctx.Settings.StoreProjectAttachmentMetaData);
+ if (Ctx.Settings.StoreProjectAttachmentMetaData)
+ {
+ Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true);
+ }
+ else
+ {
+ Stopwatch SnapshotTimer;
+ ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked();
+ OplogLock.ReleaseNow();
+
+ uint64_t AllocatedSize =
+ std::accumulate(Snapshot.PayloadBuffers.begin(),
+ Snapshot.PayloadBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); });
+ uint64_t UsedSize =
+ std::accumulate(Snapshot.Ops.begin(),
+ Snapshot.Ops.end(),
+ uint64_t(0),
+ [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); });
+
+ ZEN_INFO(
+ "GCV2: projectstore [PRECACHE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog '{}/{}' "
+ "in {}",
+ m_OplogBasePath,
+ Snapshot.Ops.size(),
+ NiceBytes(UsedSize),
+ NiceBytes(AllocatedSize),
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs()));
+
+ for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++)
+ {
+ CbObjectView& Op = Snapshot.Ops[Index];
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ }
+ }
+
m_OplogAccessTime = m_Project->LastOplogAccessTime(m_OplogId);
}
FilterReferences(Ctx, fmt::format("projectstore [PRECACHE] '{}'", m_OplogBasePath), m_References);
@@ -6145,7 +6316,43 @@ public:
OplogTimer.Reset();
- Oplog->GetAttachmentsLocked(m_AddedReferences, Ctx.Settings.StoreProjectAttachmentMetaData);
+ if (Ctx.Settings.StoreProjectAttachmentMetaData)
+ {
+ Oplog->GetAttachmentsLocked(m_References, /*StoreMetaDataOnDisk*/ true);
+ }
+ else
+ {
+ Stopwatch SnapshotTimer;
+ ProjectStore::Oplog::OplogSnapshot Snapshot = Oplog->GetSnapshotLocked();
+
+ uint64_t AllocatedSize =
+ std::accumulate(Snapshot.PayloadBuffers.begin(),
+ Snapshot.PayloadBuffers.end(),
+ uint64_t(0),
+ [](uint64_t Current, const IoBuffer& Buffer) { return Current + Buffer.GetSize(); });
+ uint64_t UsedSize =
+ std::accumulate(Snapshot.Ops.begin(),
+ Snapshot.Ops.end(),
+ uint64_t(0),
+ [](uint64_t Current, const CbObjectView& Object) { return Current + Object.GetSize(); });
+
+ ZEN_INFO(
+ "GCV2: projectstore [LOCKSTATE] '{}': Oplog snapshot fetched {} ops from {} op data using {} memory from oplog "
+ "'{}/{}' in {}",
+ m_OplogBasePath,
+ Snapshot.Ops.size(),
+ NiceBytes(UsedSize),
+ NiceBytes(AllocatedSize),
+ m_Project->Identifier,
+ m_OplogId,
+ NiceTimeSpanMs(SnapshotTimer.GetElapsedTimeMs()));
+
+ for (size_t Index = 0; Index < Snapshot.Ops.size(); Index++)
+ {
+ CbObjectView& Op = Snapshot.Ops[Index];
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_AddedReferences.emplace_back(Visitor.AsAttachment()); });
+ }
+ }
}
if (Ctx.Settings.Verbose)
{