aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-25 13:36:53 +0100
committerGitHub Enterprise <[email protected]>2024-11-25 13:36:53 +0100
commite75c5860277681be7b4a18d8d630f76c051719b4 (patch)
tree52533fe3c0e60cfe89e08ff5a4be00b215933670 /src/zenstore/filecas.cpp
parentadd missing projectstore expire time in gc log (#227) (diff)
downloadzen-e75c5860277681be7b4a18d8d630f76c051719b4.tar.xz
zen-e75c5860277681be7b4a18d8d630f76c051719b4.zip
stronger validation of payload existance (#229)
- Don't add RawSize and Size in ProjectStore::GetProjectFiles response if we can't get the payload - Use validation of payload size/existance in all chunk fetch operations in file cas - In project store oplog validate, make sure we can reach all the payloads - Add threading to oplog validate request
Diffstat (limited to 'src/zenstore/filecas.cpp')
-rw-r--r--src/zenstore/filecas.cpp114
1 files changed, 60 insertions, 54 deletions
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 62ed44bbb..3cee9f076 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -446,30 +446,11 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
}
IoBuffer
-FileCasStrategy::FindChunk(const IoHash& ChunkHash)
+FileCasStrategy::SafeOpenChunk(const IoHash& ChunkHash, uint64 ExpectedSize)
{
- ZEN_TRACE_CPU("FileCas::FindChunk");
-
- ZEN_ASSERT(m_IsInitialized);
-
- uint64_t ExpectedSize = 0;
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
- {
- ExpectedSize = It->second.Size;
- }
- else
- {
- return {};
- }
- }
-
ShardingHelper Name(m_RootDirectory, ChunkHash);
const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath();
-
- RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash));
-
+ RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash));
if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk)
{
uint64 ChunkSize = Chunk.GetSize();
@@ -536,6 +517,29 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
return {};
}
+IoBuffer
+FileCasStrategy::FindChunk(const IoHash& ChunkHash)
+{
+ ZEN_TRACE_CPU("FileCas::FindChunk");
+
+ ZEN_ASSERT(m_IsInitialized);
+
+ uint64_t ExpectedSize = 0;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
+ {
+ ExpectedSize = It->second.Size;
+ }
+ else
+ {
+ return {};
+ }
+ }
+
+ return SafeOpenChunk(ChunkHash, ExpectedSize);
+}
+
bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
@@ -597,7 +601,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool)
{
- std::vector<size_t> FoundChunkIndexes;
+ std::vector<size_t> FoundChunkIndexes;
+ std::vector<uint64_t> FoundChunkExpectedSizes;
{
RwLock::SharedLockScope _(m_Lock);
for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
@@ -605,28 +610,28 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end())
{
FoundChunkIndexes.push_back(ChunkIndex);
+ FoundChunkExpectedSizes.push_back(KeyIt->second.Size);
}
}
}
std::atomic_bool Continue = true;
if (!FoundChunkIndexes.empty())
{
- auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) {
- ShardingHelper Name(m_RootDirectory, ChunkHashes[ChunkIndex]);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath());
- if (Payload)
+ auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
+ IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize);
+ if (!AsyncCallback(ChunkIndex, std::move(Payload)))
{
- if (!AsyncCallback(ChunkIndex, std::move(Payload)))
- {
- return false;
- }
+ return false;
}
return true;
};
Latch WorkLatch(1);
- for (size_t ChunkIndex : FoundChunkIndexes)
+ for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
+ size_t ChunkIndex = FoundChunkIndexes[Index];
+ uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
if (!Continue)
{
break;
@@ -634,7 +639,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
if (OptionalWorkerPool)
{
WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() {
+ OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
if (!Continue)
{
@@ -642,7 +647,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
try
{
- if (!ProcessOne(ChunkIndex))
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
{
Continue = false;
}
@@ -658,7 +663,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
else
{
- Continue = Continue && ProcessOne(ChunkIndex);
+ Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize);
}
}
WorkLatch.CountDown();
@@ -674,28 +679,25 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&&
ZEN_ASSERT(m_IsInitialized);
- RwLock::SharedLockScope _(m_Lock);
- for (const auto& It : m_Index)
+ std::vector<IoHash> RawHashes;
+ std::vector<uint64_t> ExpectedSizes;
+
{
- const IoHash& NameHash = It.first;
- ShardingHelper Name(m_RootDirectory, NameHash);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath());
- Callback(NameHash, std::move(Payload));
+ RwLock::SharedLockScope _(m_Lock);
+ RawHashes.reserve(m_Index.size());
+ ExpectedSizes.reserve(m_Index.size());
+ for (const auto& It : m_Index)
+ {
+ RawHashes.push_back(It.first);
+ ExpectedSizes.push_back(It.second.Size);
+ }
}
-}
-
-void
-FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, uint64_t Size)>&& Callback)
-{
- ZEN_TRACE_CPU("FileCas::IterateChunks");
-
- ZEN_ASSERT(m_IsInitialized);
-
- RwLock::SharedLockScope _(m_Lock);
- for (const auto& It : m_Index)
+ for (size_t Index = 0; Index < RawHashes.size(); Index++)
{
- const IoHash& NameHash = It.first;
- Callback(NameHash, It.second.Size);
+ const IoHash& ChunkHash = RawHashes[Index];
+ const uint64_t ExpectedSize = ExpectedSizes[Index];
+ IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize);
+ Callback(ChunkHash, std::move(Payload));
}
}
@@ -1259,7 +1261,11 @@ public:
if (Ec)
{
// Target file may be open for read, attempt to move it to a temp file and mark it delete on close
- IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath);
+ IoBuffer OldChunk;
+ {
+ RwLock::SharedLockScope HashLock(m_FileCasStrategy.LockForHash(ChunkHash));
+ OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath);
+ }
if (OldChunk)
{
std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString());