aboutsummaryrefslogtreecommitdiff
path: root/zenstore/blockstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-03 22:23:26 +0200
committerGitHub <[email protected]>2022-05-03 22:23:26 +0200
commitc5b2435192f382fbaa39a8ff67de16ee3b69b7a6 (patch)
tree294bb596d61582744dd7901f6a464c324bdec3d2 /zenstore/blockstore.cpp
parentMerge pull request #84 from EpicGames/de/cleanup-lock-sharding-in-iobuffer (diff)
parentmacos compilation fix (diff)
downloadzen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.tar.xz
zen-c5b2435192f382fbaa39a8ff67de16ee3b69b7a6.zip
Merge pull request #86 from EpicGames/de/block-store-refactor
structured cache with block store
Diffstat (limited to 'zenstore/blockstore.cpp')
-rw-r--r--zenstore/blockstore.cpp1207
1 files changed, 1205 insertions, 2 deletions
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index 1eb859d5a..0992662c2 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -1,13 +1,17 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "compactcas.h"
-
#include <zenstore/blockstore.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zencore/workthreadpool.h>
# include <algorithm>
# include <random>
#endif
@@ -102,12 +106,814 @@ BlockStoreFile::Flush()
m_File.Flush();
}
+BasicFile&
+BlockStoreFile::GetBasicFile()
+{
+ return m_File;
+}
+
void
BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
{
m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
}
+constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024;
+
+void
+BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ const std::vector<BlockStoreLocation>& KnownLocations)
+{
+ ZEN_ASSERT(MaxBlockSize > 0);
+ ZEN_ASSERT(MaxBlockCount > 0);
+ ZEN_ASSERT(IsPow2(MaxBlockCount));
+
+ m_BlocksBasePath = BlocksBasePath;
+ m_MaxBlockSize = MaxBlockSize;
+
+ m_ChunkBlocks.clear();
+
+ std::unordered_set<uint32_t> KnownBlocks;
+ for (const auto& Entry : KnownLocations)
+ {
+ KnownBlocks.insert(Entry.BlockIndex);
+ }
+
+ if (std::filesystem::is_directory(m_BlocksBasePath))
+ {
+ std::vector<std::filesystem::path> FoldersToScan;
+ FoldersToScan.push_back(m_BlocksBasePath);
+ size_t FolderOffset = 0;
+ while (FolderOffset < FoldersToScan.size())
+ {
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset]))
+ {
+ if (Entry.is_directory())
+ {
+ FoldersToScan.push_back(Entry.path());
+ continue;
+ }
+ if (Entry.is_regular_file())
+ {
+ const std::filesystem::path Path = Entry.path();
+ if (Path.extension() != GetBlockFileExtension())
+ {
+ continue;
+ }
+ std::string FileName = Path.stem().string();
+ uint32_t BlockIndex;
+ bool OK = ParseHexNumber(FileName, BlockIndex);
+ if (!OK)
+ {
+ continue;
+ }
+ if (!KnownBlocks.contains(BlockIndex))
+ {
+ // Log removing unreferenced block
+ // Clear out unused blocks
+ ZEN_INFO("removing unused block at '{}'", Path);
+ std::error_code Ec;
+ std::filesystem::remove(Path, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message());
+ }
+ continue;
+ }
+ Ref<BlockStoreFile> BlockFile = new BlockStoreFile(Path);
+ BlockFile->Open();
+ m_ChunkBlocks[BlockIndex] = BlockFile;
+ }
+ }
+ ++FolderOffset;
+ }
+ }
+ else
+ {
+ CreateDirectories(m_BlocksBasePath);
+ }
+}
+
+void
+BlockStore::Close()
+{
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ m_WriteBlock = nullptr;
+ m_CurrentInsertOffset = 0;
+ m_WriteBlockIndex = 0;
+ m_ChunkBlocks.clear();
+ m_BlocksBasePath.clear();
+}
+
+BlockStoreLocation
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
+{
+ ZEN_ASSERT(Data != nullptr);
+ ZEN_ASSERT(Size > 0u);
+ ZEN_ASSERT(Size <= m_MaxBlockSize);
+ ZEN_ASSERT(Alignment > 0u);
+
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ bool IsWriting = m_WriteBlock != nullptr;
+ if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize)
+ {
+ if (m_WriteBlock)
+ {
+ m_WriteBlock = nullptr;
+ }
+ {
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
+ }
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+ m_WriteBlock = new BlockStoreFile(BlockPath);
+ m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ }
+ m_CurrentInsertOffset = 0;
+ m_WriteBlock->Create(m_MaxBlockSize);
+ }
+ uint64_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
+ Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
+ InsertLock.ReleaseNow();
+
+ WriteBlock->Write(Data, Size, InsertOffset);
+
+ return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size};
+}
+
+BlockStore::ReclaimSnapshotState
+BlockStore::GetReclaimSnapshotState()
+{
+ ReclaimSnapshotState State;
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu;
+ State.BlockCount = m_ChunkBlocks.size();
+ _.ReleaseNow();
+ return State;
+}
+
+Ref<BlockStoreFile>
+BlockStore::GetChunkBlock(const BlockStoreLocation& Location)
+{
+ RwLock::SharedLockScope InsertLock(m_InsertLock);
+ if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end())
+ {
+ return BlockIt->second;
+ }
+ return {};
+}
+
+void
+BlockStore::Flush()
+{
+ RwLock::ExclusiveLockScope _(m_InsertLock);
+ if (m_CurrentInsertOffset > 0)
+ {
+ uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire);
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ m_WriteBlock = nullptr;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ m_CurrentInsertOffset = 0;
+ }
+}
+
+void
+BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
+ const std::vector<BlockStoreLocation>& ChunkLocations,
+ const ChunkIndexArray& KeepChunkIndexes,
+ uint64_t PayloadAlignment,
+ bool DryRun,
+ const ReclaimCallback& ChangeCallback,
+ const ClaimDiskReserveCallback& DiskReserveCallback)
+{
+ if (ChunkLocations.empty())
+ {
+ return;
+ }
+ uint64_t WriteBlockTimeUs = 0;
+ uint64_t WriteBlockLongestTimeUs = 0;
+ uint64_t ReadBlockTimeUs = 0;
+ uint64_t ReadBlockLongestTimeUs = 0;
+ uint64_t TotalChunkCount = ChunkLocations.size();
+ uint64_t DeletedSize = 0;
+ uint64_t OldTotalSize = 0;
+ uint64_t NewTotalSize = 0;
+
+ uint64_t MovedCount = 0;
+ uint64_t DeletedCount = 0;
+
+ Stopwatch TotalTimer;
+ const auto _ = MakeGuard([&] {
+ ZEN_INFO(
+ "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted #{} and moved "
+ "#{} "
+ "of #{} "
+ "chunks ({}).",
+ m_BlocksBasePath,
+ NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()),
+ NiceLatencyNs(WriteBlockTimeUs),
+ NiceLatencyNs(WriteBlockLongestTimeUs),
+ NiceLatencyNs(ReadBlockTimeUs),
+ NiceLatencyNs(ReadBlockLongestTimeUs),
+ NiceBytes(DeletedSize),
+ DeletedCount,
+ MovedCount,
+ TotalChunkCount,
+ NiceBytes(OldTotalSize));
+ });
+
+ size_t BlockCount = Snapshot.BlockCount;
+
+ std::unordered_set<size_t> KeepChunkMap;
+ KeepChunkMap.reserve(KeepChunkIndexes.size());
+ for (size_t KeepChunkIndex : KeepChunkIndexes)
+ {
+ KeepChunkMap.insert(KeepChunkIndex);
+ }
+
+ std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex;
+ std::vector<ChunkIndexArray> BlockKeepChunks;
+ std::vector<ChunkIndexArray> BlockDeleteChunks;
+
+ BlockIndexToChunkMapIndex.reserve(BlockCount);
+ BlockKeepChunks.reserve(BlockCount);
+ BlockDeleteChunks.reserve(BlockCount);
+ size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2;
+
+ size_t DeleteCount = 0;
+ for (size_t Index = 0; Index < TotalChunkCount; ++Index)
+ {
+ const BlockStoreLocation& Location = ChunkLocations[Index];
+ OldTotalSize += Location.Size;
+ if (Location.BlockIndex == Snapshot.ExcludeBlockIndex)
+ {
+ continue;
+ }
+
+ auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex);
+ size_t ChunkMapIndex = 0;
+ if (BlockIndexPtr == BlockIndexToChunkMapIndex.end())
+ {
+ ChunkMapIndex = BlockKeepChunks.size();
+ BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex;
+ BlockKeepChunks.resize(ChunkMapIndex + 1);
+ BlockKeepChunks.back().reserve(GuesstimateCountPerBlock);
+ BlockDeleteChunks.resize(ChunkMapIndex + 1);
+ BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock);
+ }
+ else
+ {
+ ChunkMapIndex = BlockIndexPtr->second;
+ }
+
+ if (KeepChunkMap.contains(Index))
+ {
+ ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ NewTotalSize += Location.Size;
+ continue;
+ }
+ ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex];
+ IndexMap.push_back(Index);
+ DeleteCount++;
+ }
+
+ std::unordered_set<uint32_t> BlocksToReWrite;
+ BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size());
+ for (const auto& Entry : BlockIndexToChunkMapIndex)
+ {
+ uint32_t BlockIndex = Entry.first;
+ size_t ChunkMapIndex = Entry.second;
+ const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex];
+ if (ChunkMap.empty())
+ {
+ continue;
+ }
+ BlocksToReWrite.insert(BlockIndex);
+ }
+
+ if (DryRun)
+ {
+ ZEN_INFO("garbage collect for '{}' DISABLED, found #{} {} chunks of total #{} {}",
+ m_BlocksBasePath,
+ DeleteCount,
+ NiceBytes(OldTotalSize - NewTotalSize),
+ TotalChunkCount,
+ OldTotalSize);
+ return;
+ }
+
+ Ref<BlockStoreFile> NewBlockFile;
+ try
+ {
+ uint64_t WriteOffset = 0;
+ uint32_t NewBlockIndex = 0;
+ for (uint32_t BlockIndex : BlocksToReWrite)
+ {
+ const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
+
+ Ref<BlockStoreFile> OldBlockFile;
+ {
+ RwLock::SharedLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ WriteBlockTimeUs += ElapsedUs;
+ WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
+ });
+ OldBlockFile = m_ChunkBlocks[BlockIndex];
+ ZEN_ASSERT(OldBlockFile);
+ }
+
+ const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex];
+ if (KeepMap.empty())
+ {
+ const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ }
+ ChangeCallback({}, DeleteMap);
+ DeletedCount += DeleteMap.size();
+ {
+ RwLock::ExclusiveLockScope _i(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ continue;
+ }
+
+ MovedChunksArray MovedChunks;
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepMap)
+ {
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+
+ if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize))
+ {
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
+
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = nullptr;
+ }
+ {
+ ChangeCallback(MovedChunks, {});
+ MovedCount += KeepMap.size();
+ MovedChunks.clear();
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return;
+ }
+ while (m_ChunkBlocks.contains(NextBlockIndex))
+ {
+ NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+ std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
+ }
+
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
+ return;
+ }
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = DiskReserveCallback();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ Stopwatch Timer;
+ const auto __ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks.erase(NextBlockIndex);
+ return;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ }
+
+ NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
+ MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}});
+ WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment);
+ }
+ Chunk.clear();
+ if (NewBlockFile)
+ {
+ NewBlockFile->Truncate(WriteOffset);
+ NewBlockFile->Flush();
+ NewBlockFile = nullptr;
+ }
+
+ const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex];
+ for (size_t DeleteIndex : DeleteMap)
+ {
+ DeletedSize += ChunkLocations[DeleteIndex].Size;
+ }
+
+ ChangeCallback(MovedChunks, DeleteMap);
+ MovedCount += KeepMap.size();
+ DeletedCount += DeleteMap.size();
+ MovedChunks.clear();
+ {
+ RwLock::ExclusiveLockScope __(m_InsertLock);
+ Stopwatch Timer;
+ const auto ___ = MakeGuard([&] {
+ uint64_t ElapsedUs = Timer.GetElapsedTimeUs();
+ ReadBlockTimeUs += ElapsedUs;
+ ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs);
+ });
+ m_ChunkBlocks[BlockIndex] = nullptr;
+ }
+ ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex);
+ std::error_code Ec;
+ OldBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", OldBlockFile->GetPath(), Ec.message());
+ }
+ OldBlockFile = nullptr;
+ }
+ }
+ catch (std::exception& ex)
+ {
+ ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what());
+ if (NewBlockFile)
+ {
+ ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath());
+ std::error_code Ec;
+ NewBlockFile->MarkAsDeleteOnClose(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to flag file '{}' for deletion: '{}'", NewBlockFile->GetPath(), Ec.message());
+ }
+ }
+ }
+}
+
+void
+BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
+ IterateChunksSmallSizeCallback SmallSizeCallback,
+ IterateChunksLargeSizeCallback LargeSizeCallback)
+{
+ // We do a read sweep through the payloads file and validate
+ // any entries that are contained within each segment, with
+ // the assumption that most entries will be checked in this
+ // pass. An alternative strategy would be to use memory mapping.
+
+ {
+ ChunkIndexArray BigChunks;
+ IoBuffer ReadBuffer{ScrubSmallChunkWindowSize};
+ void* BufferBase = ReadBuffer.MutableData();
+
+ RwLock::SharedLockScope _(m_InsertLock);
+
+ for (const auto& Block : m_ChunkBlocks)
+ {
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = ScrubSmallChunkWindowSize;
+ uint32_t BlockIndex = Block.first;
+ const Ref<BlockStoreFile>& BlockFile = Block.second;
+ const uint64_t FileSize = BlockFile->FileSize();
+
+ do
+ {
+ const uint64_t ChunkSize = Min(ScrubSmallChunkWindowSize, FileSize - WindowStart);
+ BlockFile->Read(BufferBase, ChunkSize, WindowStart);
+
+ // TODO: We could be smarter here if the ChunkLocations were sorted on block index - we could
+ // then only scan a subset of ChunkLocations instead of scanning through them all...
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex)
+ {
+ const BlockStoreLocation Location = ChunkLocations[ChunkIndex];
+ if (BlockIndex != Location.BlockIndex)
+ {
+ continue;
+ }
+
+ const uint64_t EntryOffset = Location.Offset;
+ if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
+ {
+ const uint64_t EntryEnd = EntryOffset + Location.Size;
+
+ if (EntryEnd >= WindowEnd)
+ {
+ BigChunks.push_back(ChunkIndex);
+
+ continue;
+ }
+
+ SmallSizeCallback(ChunkIndex,
+ reinterpret_cast<uint8_t*>(BufferBase) + Location.Offset - WindowStart,
+ Location.Size);
+ }
+ }
+
+ WindowStart += ScrubSmallChunkWindowSize;
+ WindowEnd += ScrubSmallChunkWindowSize;
+ } while (WindowStart < FileSize);
+ }
+
+ // Deal with large chunks and chunks that extend over a ScrubSmallChunkWindowSize border
+ for (size_t ChunkIndex : BigChunks)
+ {
+ const BlockStoreLocation Location = ChunkLocations[ChunkIndex];
+ const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[Location.BlockIndex];
+ LargeSizeCallback(ChunkIndex, BlockFile, Location.Offset, Location.Size);
+ }
+ }
+}
+
+bool
+BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations,
+ const std::filesystem::path& SourceBlockFilePath,
+ const std::filesystem::path& BlocksBasePath,
+ uint64_t MaxBlockSize,
+ uint64_t MaxBlockCount,
+ size_t PayloadAlignment,
+ bool CleanSource,
+ const SplitCallback& Callback)
+{
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error);
+ if (Error)
+ {
+ ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message());
+ return false;
+ }
+
+ if (Space.Free < MaxBlockSize)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
+ BlocksBasePath,
+ MaxBlockSize,
+ NiceBytes(Space.Free));
+ return false;
+ }
+
+ size_t TotalSize = 0;
+ for (const BlockStoreLocation& Location : ChunkLocations)
+ {
+ TotalSize += Location.Size;
+ }
+ size_t ChunkCount = ChunkLocations.size();
+ uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount);
+ uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize;
+ if (MaxRequiredBlockCount > MaxBlockCount)
+ {
+ ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
+ BlocksBasePath,
+ MaxRequiredBlockCount,
+ MaxBlockCount);
+ return false;
+ }
+
+ constexpr const uint64_t DiskReserve = 1ul << 28;
+
+ if (CleanSource)
+ {
+ if (Space.Free < (MaxBlockSize + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ BlocksBasePath,
+ NiceBytes(MaxBlockSize + DiskReserve),
+ NiceBytes(Space.Free));
+ return false;
+ }
+ }
+ else
+ {
+ if (Space.Free < (RequiredDiskSpace + DiskReserve))
+ {
+ ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
+ BlocksBasePath,
+ NiceBytes(RequiredDiskSpace + DiskReserve),
+ NiceBytes(Space.Free));
+ return false;
+ }
+ }
+
+ uint32_t WriteBlockIndex = 0;
+ while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+
+ BasicFile BlockFile;
+ BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
+
+ if (CleanSource && (MaxRequiredBlockCount < 2))
+ {
+ MovedChunksArray Chunks;
+ Chunks.reserve(ChunkCount);
+ for (size_t Index = 0; Index < ChunkCount; ++Index)
+ {
+ const BlockStoreLocation& ChunkLocation = ChunkLocations[Index];
+ Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}});
+ }
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex);
+ CreateDirectories(BlockPath.parent_path());
+ BlockFile.Close();
+ std::filesystem::rename(SourceBlockFilePath, BlockPath);
+ Callback(Chunks);
+ return true;
+ }
+
+ ChunkIndexArray ChunkIndexes;
+ ChunkIndexes.reserve(ChunkCount);
+ for (size_t Index = 0; Index < ChunkCount; ++Index)
+ {
+ ChunkIndexes.push_back(Index);
+ }
+
+ std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&](size_t Lhs, size_t Rhs) {
+ const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs];
+ const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs];
+ return LhsLocation.Offset < RhsLocation.Offset;
+ });
+
+ uint64_t BlockSize = 0;
+ uint64_t BlockOffset = 0;
+ std::vector<BlockStoreLocation> NewLocations;
+ struct BlockData
+ {
+ MovedChunksArray Chunks;
+ uint64_t BlockOffset;
+ uint64_t BlockSize;
+ uint32_t BlockIndex;
+ };
+
+ std::vector<BlockData> BlockRanges;
+ MovedChunksArray Chunks;
+ BlockRanges.reserve(MaxRequiredBlockCount);
+ for (const size_t& ChunkIndex : ChunkIndexes)
+ {
+ const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex];
+
+ uint64_t ChunkOffset = LegacyChunkLocation.Offset;
+ uint64_t ChunkSize = LegacyChunkLocation.Size;
+ uint64_t ChunkEnd = ChunkOffset + ChunkSize;
+
+ if (BlockSize == 0)
+ {
+ BlockOffset = ChunkOffset;
+ }
+ if ((ChunkEnd - BlockOffset) > MaxBlockSize)
+ {
+ BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
+ BlockRange.Chunks.swap(Chunks);
+ BlockRanges.push_back(BlockRange);
+
+ WriteBlockIndex++;
+ while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
+ {
+ ++WriteBlockIndex;
+ }
+ BlockOffset = ChunkOffset;
+ BlockSize = 0;
+ }
+ BlockSize = RoundUp(BlockSize, PayloadAlignment);
+ BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
+ Chunks.push_back({ChunkIndex, ChunkLocation});
+ BlockSize = ChunkEnd - BlockOffset;
+ }
+ if (BlockSize > 0)
+ {
+ BlockRanges.push_back(
+ {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
+ }
+
+ Stopwatch WriteBlockTimer;
+
+ std::reverse(BlockRanges.begin(), BlockRanges.end());
+ std::vector<std::uint8_t> Buffer(1 << 28);
+ for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
+ {
+ const BlockData& BlockRange = BlockRanges[Idx];
+ if (Idx > 0)
+ {
+ uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
+ uint64_t Completed = BlockOffset + BlockSize - Remaining;
+ uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
+
+ ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
+ BlocksBasePath,
+ Idx,
+ BlockRanges.size(),
+ NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
+ NiceBytes(BlockOffset + BlockSize),
+ NiceTimeSpanMs(ETA));
+ }
+
+ std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
+ BlockStoreFile ChunkBlock(BlockPath);
+ ChunkBlock.Create(BlockRange.BlockSize);
+ uint64_t Offset = 0;
+ while (Offset < BlockRange.BlockSize)
+ {
+ uint64_t Size = BlockRange.BlockSize - Offset;
+ if (Size > Buffer.size())
+ {
+ Size = Buffer.size();
+ }
+ BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
+ ChunkBlock.Write(Buffer.data(), Size, Offset);
+ Offset += Size;
+ }
+ ChunkBlock.Truncate(Offset);
+ ChunkBlock.Flush();
+
+ Callback(BlockRange.Chunks);
+
+ if (CleanSource)
+ {
+ BlockFile.SetFileSize(BlockRange.BlockOffset);
+ }
+ }
+ BlockFile.Close();
+
+ return true;
+}
+
+const char*
+BlockStore::GetBlockFileExtension()
+{
+ return ".ucas";
+}
+
+std::filesystem::path
+BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex)
+{
+ ExtendablePathBuilder<256> Path;
+
+ char BlockHexString[9];
+ ToHexNumber(BlockIndex, BlockHexString);
+
+ Path.Append(BlocksBasePath);
+ Path.AppendSeparator();
+ Path.AppendAsciiRange(BlockHexString, BlockHexString + 4);
+ Path.AppendSeparator();
+ Path.Append(BlockHexString);
+ Path.Append(GetBlockFileExtension());
+ return Path.ToPath();
+}
+
#if ZEN_WITH_TESTS
static bool
@@ -232,6 +1038,403 @@ TEST_CASE("blockstore.blockfile")
CHECK(!std::filesystem::exists(RootDirectory / "1"));
}
+namespace {
+ BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment)
+ {
+ BlockStoreLocation Location = Store.WriteChunk(String.data(), String.length(), PayloadAlignment);
+ CHECK(Location.Size == String.length());
+ return Location;
+ };
+
+ std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location)
+ {
+ Ref<BlockStoreFile> ChunkBlock(Store.GetChunkBlock(Location));
+ if (!ChunkBlock)
+ {
+ return "";
+ }
+ IoBuffer ChunkData = ChunkBlock->GetChunk(Location.Offset, Location.Size);
+ if (!ChunkData)
+ {
+ return "";
+ }
+ std::string AsString((const char*)ChunkData.Data(), ChunkData.Size());
+ return AsString;
+ };
+
+ std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories)
+ {
+ FileSystemTraversal Traversal;
+ struct Visitor : public FileSystemTraversal::TreeVisitor
+ {
+ virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t) override
+ {
+ if (Files)
+ {
+ Items.push_back(Parent / File);
+ }
+ }
+
+ virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& Dir) override
+ {
+ if (Directories)
+ {
+ Items.push_back(Parent / Dir);
+ }
+ return true;
+ }
+
+ bool Files;
+ bool Directories;
+ std::vector<std::filesystem::path> Items;
+ } Visit;
+ Visit.Files = Files;
+ Visit.Directories = Directories;
+
+ Traversal.TraverseFileSystem(RootDir, Visit);
+ return Visit.Items;
+ };
+
+ static IoBuffer CreateChunk(uint64_t Size)
+ {
+ static std::random_device rd;
+ static std::mt19937 g(rd());
+
+ std::vector<uint8_t> Values;
+ Values.resize(Size);
+ for (size_t Idx = 0; Idx < Size; ++Idx)
+ {
+ Values[Idx] = static_cast<uint8_t>(Idx);
+ }
+ std::shuffle(Values.begin(), Values.end(), g);
+
+ return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size());
+ }
+} // namespace
+
+TEST_CASE("blockstore.chunks")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory, 128, 1024, {});
+ Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512});
+ CHECK(!BadChunk);
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+
+ CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
+ CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
+
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4);
+ CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex);
+
+ CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData);
+ CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData);
+ CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData);
+}
+
+TEST_CASE("blockstore.clean.stray.blocks")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 128, 1024, {});
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ WriteStringAsChunk(Store, ThirdChunkData, 4);
+
+ Store.Close();
+
+ // Not referencing the second block means that we should be deleted
+ Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation});
+
+ CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1);
+}
+
+TEST_CASE("blockstore.flush.forces.new.block")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 128, 1024, {});
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+ Store.Flush();
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ Store.Flush();
+ std::string ThirdChunkData =
+ "This is a much longer string that will not fit in the first block so it should be placed in the second block";
+ WriteStringAsChunk(Store, ThirdChunkData, 4);
+
+ CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3);
+}
+
+TEST_CASE("blockstore.iterate.chunks")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {});
+ Ref<BlockStoreFile> BadChunk = Store.GetChunkBlock({.BlockIndex = 0, .Offset = 0, .Size = 512});
+ CHECK(!BadChunk);
+
+ std::string FirstChunkData = "This is the data of the first chunk that we will write";
+ BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4);
+
+ std::string SecondChunkData = "This is the data for the second chunk that we will write";
+ BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4);
+ Store.Flush();
+
+ std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L');
+ BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4);
+
+ Store.IterateChunks(
+ {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation},
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ CHECK(Data);
+ CHECK(Size > 0);
+ std::string AsString((const char*)Data, Size);
+ switch (ChunkIndex)
+ {
+ case 0:
+ CHECK(AsString == FirstChunkData);
+ break;
+ case 1:
+ CHECK(AsString == SecondChunkData);
+ break;
+ default:
+ CHECK(false);
+ break;
+ }
+ },
+ [&](size_t ChunkIndex, Ref<BlockStoreFile> BlockFile, uint64_t Offset, uint64_t Size) {
+ CHECK(BlockFile);
+ CHECK(ChunkIndex == 2);
+ CHECK(Offset == VeryLargeChunkLocation.Offset);
+ CHECK(Size == VeryLargeChunkLocation.Size);
+ size_t StreamOffset = 0;
+ BlockFile->StreamByteRange(Offset, Size, [&](const void* Data, size_t Size) {
+ const char* VeryLargeChunkSection = &(VeryLargeChunk.data()[StreamOffset]);
+ CHECK(memcmp(VeryLargeChunkSection, Data, Size) == 0);
+ });
+ });
+}
+
+TEST_CASE("blockstore.reclaim.space")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 512, 1024, {});
+
+ constexpr size_t ChunkCount = 200;
+ constexpr size_t Alignment = 8;
+ std::vector<BlockStoreLocation> ChunkLocations;
+ std::vector<IoHash> ChunkHashes;
+ ChunkLocations.reserve(ChunkCount);
+ ChunkHashes.reserve(ChunkCount);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer Chunk = CreateChunk(57 + ChunkIndex);
+ ChunkLocations.push_back(Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment));
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ std::vector<size_t> ChunksToKeep;
+ ChunksToKeep.reserve(ChunkLocations.size());
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ ChunksToKeep.push_back(ChunkIndex);
+ }
+
+ Store.Flush();
+ BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState();
+ Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true);
+
+ // If we keep all the chunks we should not get any callbacks on moved/deleted stuff
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ ChunksToKeep,
+ Alignment,
+ false,
+ [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+
+ size_t DeleteChunkCount = 38;
+ ChunksToKeep.clear();
+ for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ ChunksToKeep.push_back(ChunkIndex);
+ }
+
+ std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations;
+ size_t MovedChunkCount = 0;
+ size_t DeletedChunkCount = 0;
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ ChunksToKeep,
+ Alignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
+ for (const auto& MovedChunk : MovedChunks)
+ {
+ CHECK(MovedChunk.first >= DeleteChunkCount);
+ NewChunkLocations[MovedChunk.first] = MovedChunk.second;
+ }
+ MovedChunkCount += MovedChunks.size();
+ for (size_t DeletedIndex : DeletedChunks)
+ {
+ CHECK(DeletedIndex < DeleteChunkCount);
+ }
+ DeletedChunkCount += DeletedChunks.size();
+ },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+ CHECK(MovedChunkCount <= DeleteChunkCount);
+ CHECK(DeletedChunkCount == DeleteChunkCount);
+ ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end());
+
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(NewChunkLocations[ChunkIndex]);
+ if (ChunkIndex >= DeleteChunkCount)
+ {
+ CHECK(ChunkBlock);
+ IoBuffer VerifyChunk = ChunkBlock->GetChunk(NewChunkLocations[ChunkIndex].Offset, NewChunkLocations[ChunkIndex].Size);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ }
+ }
+
+ NewChunkLocations = ChunkLocations;
+ MovedChunkCount = 0;
+ DeletedChunkCount = 0;
+ Store.ReclaimSpace(
+ State1,
+ ChunkLocations,
+ {},
+ Alignment,
+ false,
+ [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) {
+ CHECK(MovedChunks.empty());
+ DeletedChunkCount += DeletedChunks.size();
+ },
+ []() {
+ CHECK(false);
+ return 0;
+ });
+ CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount);
+}
+
+TEST_CASE("blockstore.thread.read.write")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto RootDirectory = TempDir.Path();
+
+ BlockStore Store;
+ Store.Initialize(RootDirectory / "store", 1088, 1024, {});
+
+ constexpr size_t ChunkCount = 1000;
+ constexpr size_t Alignment = 8;
+ std::vector<IoBuffer> Chunks;
+ std::vector<IoHash> ChunkHashes;
+ Chunks.reserve(ChunkCount);
+ ChunkHashes.reserve(ChunkCount);
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2);
+ Chunks.push_back(Chunk);
+ ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
+ }
+
+ std::vector<BlockStoreLocation> ChunkLocations;
+ ChunkLocations.resize(ChunkCount);
+
+ WorkerThreadPool WorkerPool(8);
+ std::atomic<size_t> WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ ChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]);
+ CHECK(ChunkBlock);
+ IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size())
+ {
+ Sleep(1);
+ }
+
+ std::vector<BlockStoreLocation> SecondChunkLocations;
+ SecondChunkLocations.resize(ChunkCount);
+ WorkCompleted = 0;
+ for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
+ {
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() {
+ IoBuffer& Chunk = Chunks[ChunkIndex];
+ SecondChunkLocations[ChunkIndex] = Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment);
+ WorkCompleted.fetch_add(1);
+ });
+ WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() {
+ Ref<BlockStoreFile> ChunkBlock = Store.GetChunkBlock(ChunkLocations[ChunkIndex]);
+ CHECK(ChunkBlock);
+ IoBuffer VerifyChunk = ChunkBlock->GetChunk(ChunkLocations[ChunkIndex].Offset, ChunkLocations[ChunkIndex].Size);
+ CHECK(VerifyChunk);
+ IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size());
+ CHECK(VerifyHash == ChunkHashes[ChunkIndex]);
+ WorkCompleted.fetch_add(1);
+ });
+ }
+ while (WorkCompleted < Chunks.size() * 2)
+ {
+ Sleep(1);
+ }
+}
+
#endif
void