aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-25 13:36:53 +0100
committerGitHub Enterprise <[email protected]>2024-11-25 13:36:53 +0100
commite75c5860277681be7b4a18d8d630f76c051719b4 (patch)
tree52533fe3c0e60cfe89e08ff5a4be00b215933670 /src
parentadd missing projectstore expire time in gc log (#227) (diff)
downloadzen-e75c5860277681be7b4a18d8d630f76c051719b4.tar.xz
zen-e75c5860277681be7b4a18d8d630f76c051719b4.zip
stronger validation of payload existance (#229)
- Don't add RawSize and Size in ProjectStore::GetProjectFiles response if we can't get the payload - Use validation of payload size/existance in all chunk fetch operations in file cas - In project store oplog validate, make sure we can reach all the payloads - Add threading to oplog validate request
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp5
-rw-r--r--src/zenserver/projectstore/projectstore.cpp147
-rw-r--r--src/zenserver/projectstore/projectstore.h2
-rw-r--r--src/zenstore/compactcas.cpp28
-rw-r--r--src/zenstore/filecas.cpp114
-rw-r--r--src/zenstore/filecas.h2
6 files changed, 197 insertions, 101 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 2954bcdc0..4babcd224 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -14,6 +14,7 @@
#include <zencore/stream.h>
#include <zencore/trace.h>
#include <zenstore/zenstore.h>
+#include <zenutil/workerpools.h>
namespace zen {
@@ -1164,7 +1165,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
void
HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req)
{
- ZEN_TRACE_CPU("ProjectService::OplogOpNew");
+ ZEN_TRACE_CPU("ProjectService::OplogOpValidate");
using namespace std::literals;
@@ -1197,7 +1198,7 @@ HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req)
ProjectStore::Oplog& Oplog = *FoundLog;
std::atomic_bool CancelFlag = false;
- ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag);
+ ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst));
tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup;
KeyNameLookup.reserve(Result.OpKeys.size());
for (const auto& It : Result.OpKeys)
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 1b48a542c..a43c9e0e2 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1323,7 +1323,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f
}
ProjectStore::Oplog::ValidationResult
-ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
+ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool)
{
using namespace std::literals;
@@ -1348,25 +1348,27 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
});
Result.OpCount = gsl::narrow<uint32_t>(Keys.size());
- for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
- {
+
+ RwLock ResultLock;
+
+ auto ValidateOne = [&](uint32_t OpIndex) {
const Oid& KeyHash = KeyHashes[OpIndex];
const std::string& Key = Keys[OpIndex];
const OplogEntryMapping& Mapping(Mappings[OpIndex]);
bool HasMissingEntries = false;
for (const ChunkMapping& Chunk : Mapping.Chunks)
{
- if (!m_CidStore.ContainsChunk(Chunk.Hash))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload)
{
- Result.MissingChunks.push_back({KeyHash, Chunk});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); });
HasMissingEntries = true;
}
}
for (const ChunkMapping& Meta : Mapping.Meta)
{
- if (!m_CidStore.ContainsChunk(Meta.Hash))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload)
{
- Result.MissingMetas.push_back({KeyHash, Meta});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); });
HasMissingEntries = true;
}
}
@@ -1377,15 +1379,15 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
std::filesystem::path FilePath = m_OuterProject->RootDir / File.ServerPath;
if (!std::filesystem::is_regular_file(FilePath))
{
- Result.MissingFiles.push_back({KeyHash, File});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
HasMissingEntries = true;
}
}
else
{
- if (!m_CidStore.ContainsChunk(File.Hash))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload)
{
- Result.MissingFiles.push_back({KeyHash, File});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
HasMissingEntries = true;
}
}
@@ -1393,18 +1395,39 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
const std::vector<IoHash>& OpAttachments = Attachments[OpIndex];
for (const IoHash& Attachment : OpAttachments)
{
- if (!m_CidStore.ContainsChunk(Attachment))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload)
{
- Result.MissingAttachments.push_back({KeyHash, Attachment});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); });
HasMissingEntries = true;
}
}
if (HasMissingEntries)
{
- Result.OpKeys.push_back({KeyHash, Key});
+ ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); });
+ }
+ };
+
+ Latch WorkLatch(1);
+
+ for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
+ {
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() {
+ auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); });
+ ValidateOne(Index);
+ });
+ }
+ else
+ {
+ ValidateOne(OpIndex);
}
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+
{
// Check if we were deleted while we were checking the references without a lock...
RwLock::SharedLockScope _(m_OplogLock);
@@ -1963,12 +1986,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
try
{
IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
- if (Payload)
+ if (!AsyncCallback(FileChunkIndex, Payload))
{
- if (!AsyncCallback(FileChunkIndex, Payload))
- {
- Result.store(false);
- }
+ Result.store(false);
}
}
catch (const std::exception& Ex)
@@ -3922,11 +3942,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
{
if (WantsSizeField)
{
- Sizes.resize(Ids.size(), 0u);
+ Sizes.resize(Ids.size(), (uint64_t)-1);
}
if (WantsRawSizeField)
{
- RawSizes.resize(Ids.size(), 0u);
+ RawSizes.resize(Ids.size(), (uint64_t)-1);
}
FoundLog->IterateChunks(
@@ -3934,20 +3954,35 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
[&](size_t Index, const IoBuffer& Payload) {
try
{
- uint64_t Size = Payload.GetSize();
- if (WantsRawSizeField)
+ if (Payload)
{
- uint64_t RawSize = Size;
- if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ if (WantsRawSizeField)
+ {
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash _;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSize))
+ {
+ ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.",
+ ProjectId,
+ OplogId,
+ Ids[Index]);
+ }
+ else
+ {
+ RawSizes[Index] = RawSize;
+ }
+ }
+ }
+ if (WantsSizeField)
{
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize);
+ Sizes[Index] = Payload.GetSize();
}
- RawSizes[Index] = RawSize;
}
- if (WantsSizeField)
+ else
{
- Sizes[Index] = Size;
+ ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.", ProjectId, OplogId, Ids[Index]);
}
}
catch (const std::exception& Ex)
@@ -3980,11 +4015,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
{
Response << "clientpath"sv << ClientPaths[Index];
}
- if (WantsSizeField)
+ if (WantsSizeField && Sizes[Index] != (uint64_t)-1)
{
Response << "size"sv << Sizes[Index];
}
- if (WantsRawSizeField)
+ if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1)
{
Response << "rawsize"sv << RawSizes[Index];
}
@@ -4062,20 +4097,50 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
[&](size_t Index, const IoBuffer& Chunk) -> bool {
try
{
- uint64_t Size = Chunk.GetSize();
- if (WantsRawSizeField)
+ if (Chunk)
{
- uint64_t RawSize = Size;
- if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ if (WantsRawSizeField)
+ {
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t RawSize;
+ IoHash RawHash;
+ if (!CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize))
+ {
+ ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.",
+ ProjectId,
+ OplogId,
+ Ids[Index]);
+ }
+ else if (RawHash != Hashes[Index])
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': payload for project file info for id {} does not match expected raw hash {}, found "
+ "{}.",
+ ProjectId,
+ OplogId,
+ Ids[Index],
+ Hashes[Index],
+ RawHash);
+ }
+ else
+ {
+ RawSizes[Index] = RawSize;
+ }
+ }
+ else
+ {
+ RawSizes[Index] = Chunk.GetSize();
+ }
+ }
+ if (WantsSizeField)
{
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
+ Sizes[Index] = Chunk.GetSize();
}
- RawSizes[Index] = RawSize;
}
- if (WantsSizeField)
+ else
{
- Sizes[Index] = Size;
+ ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}", ProjectId, OplogId, Ids[Index]);
}
}
catch (const std::exception& Ex)
@@ -5782,7 +5847,7 @@ public:
if (Oplog != nullptr)
{
- Result = Oplog->Validate(Ctx.IsCancelledFlag);
+ Result = Oplog->Validate(Ctx.IsCancelledFlag, nullptr);
if (Ctx.IsCancelledFlag)
{
return;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 1619151dd..d8b13585b 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -212,7 +212,7 @@ public:
}
};
- ValidationResult Validate(std::atomic_bool& IsCancelledFlag);
+ ValidationResult Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool);
private:
struct FileMapEntry
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index b3309f7a7..bc30301d1 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -307,6 +307,18 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool)
{
+ if (ChunkHashes.size() < 3)
+ {
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
+ {
+ IoBuffer Chunk = FindChunk(ChunkHashes[ChunkIndex]);
+ if (!AsyncCallback(ChunkIndex, Chunk))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
std::vector<size_t> FoundChunkIndexes;
std::vector<BlockStoreLocation> FoundChunkLocations;
RwLock::SharedLockScope _(m_LocationMapLock);
@@ -326,7 +338,7 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
[&](size_t ChunkIndex, const void* Data, uint64_t Size) {
if (Data == nullptr)
{
- return true;
+ return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer());
}
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
},
@@ -338,7 +350,19 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
Latch WorkLatch(1);
std::atomic_bool AsyncContinue = true;
bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool)
+ if (ChunkIndexes.size() < 3)
+ {
+ for (size_t ChunkIndex : ChunkIndexes)
+ {
+ IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]);
+ if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+ else if (OptionalWorkerPool)
{
WorkLatch.AddCount(1);
OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() {
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 62ed44bbb..3cee9f076 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -446,30 +446,11 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
}
IoBuffer
-FileCasStrategy::FindChunk(const IoHash& ChunkHash)
+FileCasStrategy::SafeOpenChunk(const IoHash& ChunkHash, uint64 ExpectedSize)
{
- ZEN_TRACE_CPU("FileCas::FindChunk");
-
- ZEN_ASSERT(m_IsInitialized);
-
- uint64_t ExpectedSize = 0;
- {
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
- {
- ExpectedSize = It->second.Size;
- }
- else
- {
- return {};
- }
- }
-
ShardingHelper Name(m_RootDirectory, ChunkHash);
const std::filesystem::path ChunkPath = Name.ShardedPath.ToPath();
-
- RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash));
-
+ RwLock::SharedLockScope ShardLock(LockForHash(ChunkHash));
if (IoBuffer Chunk = IoBufferBuilder::MakeFromFile(ChunkPath); Chunk)
{
uint64 ChunkSize = Chunk.GetSize();
@@ -536,6 +517,29 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash)
return {};
}
+IoBuffer
+FileCasStrategy::FindChunk(const IoHash& ChunkHash)
+{
+ ZEN_TRACE_CPU("FileCas::FindChunk");
+
+ ZEN_ASSERT(m_IsInitialized);
+
+ uint64_t ExpectedSize = 0;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Index.find(ChunkHash); It != m_Index.end())
+ {
+ ExpectedSize = It->second.Size;
+ }
+ else
+ {
+ return {};
+ }
+ }
+
+ return SafeOpenChunk(ChunkHash, ExpectedSize);
+}
+
bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
@@ -597,7 +601,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool)
{
- std::vector<size_t> FoundChunkIndexes;
+ std::vector<size_t> FoundChunkIndexes;
+ std::vector<uint64_t> FoundChunkExpectedSizes;
{
RwLock::SharedLockScope _(m_Lock);
for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++)
@@ -605,28 +610,28 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
if (auto KeyIt = m_Index.find(ChunkHashes[ChunkIndex]); KeyIt != m_Index.end())
{
FoundChunkIndexes.push_back(ChunkIndex);
+ FoundChunkExpectedSizes.push_back(KeyIt->second.Size);
}
}
}
std::atomic_bool Continue = true;
if (!FoundChunkIndexes.empty())
{
- auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex) {
- ShardingHelper Name(m_RootDirectory, ChunkHashes[ChunkIndex]);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath());
- if (Payload)
+ auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) {
+ const IoHash& ChunkHash = ChunkHashes[ChunkIndex];
+ IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize);
+ if (!AsyncCallback(ChunkIndex, std::move(Payload)))
{
- if (!AsyncCallback(ChunkIndex, std::move(Payload)))
- {
- return false;
- }
+ return false;
}
return true;
};
Latch WorkLatch(1);
- for (size_t ChunkIndex : FoundChunkIndexes)
+ for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
+ size_t ChunkIndex = FoundChunkIndexes[Index];
+ uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
if (!Continue)
{
break;
@@ -634,7 +639,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
if (OptionalWorkerPool)
{
WorkLatch.AddCount(1);
- OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, &Continue]() {
+ OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
if (!Continue)
{
@@ -642,7 +647,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
try
{
- if (!ProcessOne(ChunkIndex))
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
{
Continue = false;
}
@@ -658,7 +663,7 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
}
else
{
- Continue = Continue && ProcessOne(ChunkIndex);
+ Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize);
}
}
WorkLatch.CountDown();
@@ -674,28 +679,25 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&&
ZEN_ASSERT(m_IsInitialized);
- RwLock::SharedLockScope _(m_Lock);
- for (const auto& It : m_Index)
+ std::vector<IoHash> RawHashes;
+ std::vector<uint64_t> ExpectedSizes;
+
{
- const IoHash& NameHash = It.first;
- ShardingHelper Name(m_RootDirectory, NameHash);
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(Name.ShardedPath.ToPath());
- Callback(NameHash, std::move(Payload));
+ RwLock::SharedLockScope _(m_Lock);
+ RawHashes.reserve(m_Index.size());
+ ExpectedSizes.reserve(m_Index.size());
+ for (const auto& It : m_Index)
+ {
+ RawHashes.push_back(It.first);
+ ExpectedSizes.push_back(It.second.Size);
+ }
}
-}
-
-void
-FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, uint64_t Size)>&& Callback)
-{
- ZEN_TRACE_CPU("FileCas::IterateChunks");
-
- ZEN_ASSERT(m_IsInitialized);
-
- RwLock::SharedLockScope _(m_Lock);
- for (const auto& It : m_Index)
+ for (size_t Index = 0; Index < RawHashes.size(); Index++)
{
- const IoHash& NameHash = It.first;
- Callback(NameHash, It.second.Size);
+ const IoHash& ChunkHash = RawHashes[Index];
+ const uint64_t ExpectedSize = ExpectedSizes[Index];
+ IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize);
+ Callback(ChunkHash, std::move(Payload));
}
}
@@ -1259,7 +1261,11 @@ public:
if (Ec)
{
// Target file may be open for read, attempt to move it to a temp file and mark it delete on close
- IoBuffer OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath);
+ IoBuffer OldChunk;
+ {
+ RwLock::SharedLockScope HashLock(m_FileCasStrategy.LockForHash(ChunkHash));
+ OldChunk = IoBufferBuilder::MakeFromFile(ChunkPath);
+ }
if (OldChunk)
{
std::filesystem::path TempPath(ChunkPath.parent_path() / Oid::NewOid().ToString());
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index fb4b1888b..21d8c3b9e 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -90,8 +90,8 @@ private:
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback);
- void IterateChunks(std::function<void(const IoHash& Hash, uint64_t PayloadSize)>&& Callback);
void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
+ IoBuffer SafeOpenChunk(const IoHash& ChunkHash, uint64_t ExpectedSize);
struct ShardingHelper
{