diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenstore/blockstore.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenstore/blockstore.cpp')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 1312 |
1 files changed, 1312 insertions, 0 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp new file mode 100644 index 000000000..5dfa10c91 --- /dev/null +++ b/src/zenstore/blockstore.cpp @@ -0,0 +1,1312 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/blockstore.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> + +#include <algorithm> + +#if ZEN_WITH_TESTS +# include <zencore/compactbinarybuilder.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +# include <zencore/workthreadpool.h> +# include <random> +#endif + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +////////////////////////////////////////////////////////////////////////// + +BlockStoreFile::BlockStoreFile(const std::filesystem::path& BlockPath) : m_Path(BlockPath) +{ +} + +BlockStoreFile::~BlockStoreFile() +{ + m_IoBuffer = IoBuffer(); + m_File.Detach(); +} + +const std::filesystem::path& +BlockStoreFile::GetPath() const +{ + return m_Path; +} + +void +BlockStoreFile::Open() +{ + m_File.Open(m_Path, BasicFile::Mode::kDelete); + void* FileHandle = m_File.Handle(); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); +} + +void +BlockStoreFile::Create(uint64_t InitialSize) +{ + auto ParentPath = m_Path.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) + { + CreateDirectories(ParentPath); + } + + m_File.Open(m_Path, BasicFile::Mode::kTruncateDelete); + void* FileHandle = m_File.Handle(); + + // We map our m_IoBuffer beyond the file size as we will grow it over time and want + // to be able to create sub-buffers of all the written range later + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); +} + +uint64_t +BlockStoreFile::FileSize() +{ + return m_File.FileSize(); +} + +void +BlockStoreFile::MarkAsDeleteOnClose() +{ + m_IoBuffer.MarkAsDeleteOnClose(); +} + +IoBuffer +BlockStoreFile::GetChunk(uint64_t Offset, uint64_t Size) +{ + return IoBuffer(m_IoBuffer, Offset, Size); +} + +void +BlockStoreFile::Read(void* Data, uint64_t Size, uint64_t FileOffset) +{ + m_File.Read(Data, Size, FileOffset); +} + +void +BlockStoreFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset) +{ + m_File.Write(Data, Size, FileOffset); +} + +void +BlockStoreFile::Flush() +{ + m_File.Flush(); +} + +BasicFile& +BlockStoreFile::GetBasicFile() +{ + return m_File; +} + +void +BlockStoreFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) +{ + m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun)); +} + +constexpr uint64_t ScrubSmallChunkWindowSize = 4 * 1024 * 1024; + +void +BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, + uint64_t MaxBlockSize, + uint64_t MaxBlockCount, + const std::vector<BlockStoreLocation>& KnownLocations) +{ + ZEN_ASSERT(MaxBlockSize > 0); + ZEN_ASSERT(MaxBlockCount > 0); + ZEN_ASSERT(IsPow2(MaxBlockCount)); + + m_TotalSize = 0; + m_BlocksBasePath = BlocksBasePath; + m_MaxBlockSize = MaxBlockSize; + + m_ChunkBlocks.clear(); + + std::unordered_set<uint32_t> KnownBlocks; + for (const auto& Entry : KnownLocations) + { + KnownBlocks.insert(Entry.BlockIndex); + } + + if (std::filesystem::is_directory(m_BlocksBasePath)) + { + std::vector<std::filesystem::path> FoldersToScan; + FoldersToScan.push_back(m_BlocksBasePath); + size_t FolderOffset = 0; + while (FolderOffset < FoldersToScan.size()) + { + for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(FoldersToScan[FolderOffset])) + { + if (Entry.is_directory()) + { + FoldersToScan.push_back(Entry.path()); + continue; + } + if (Entry.is_regular_file()) + { + const std::filesystem::path Path = Entry.path(); + if (Path.extension() != GetBlockFileExtension()) + { + continue; + } + std::string FileName = PathToUtf8(Path.stem()); + uint32_t BlockIndex; + bool OK = ParseHexNumber(FileName, BlockIndex); + if (!OK) + { + continue; + } + if (!KnownBlocks.contains(BlockIndex)) + { + // Log removing unreferenced block + // Clear out unused blocks + ZEN_DEBUG("removing unused block at '{}'", Path); + std::error_code Ec; + std::filesystem::remove(Path, Ec); + if (Ec) + { + ZEN_WARN("Failed to delete file '{}' reason: '{}'", Path, Ec.message()); + } + continue; + } + Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)}; + BlockFile->Open(); + m_TotalSize.fetch_add(BlockFile->FileSize(), std::memory_order::relaxed); + m_ChunkBlocks[BlockIndex] = BlockFile; + } + } + ++FolderOffset; + } + } + else + { + CreateDirectories(m_BlocksBasePath); + } +} + +void +BlockStore::Close() +{ + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + m_WriteBlock = nullptr; + m_CurrentInsertOffset = 0; + m_WriteBlockIndex = 0; + + m_ChunkBlocks.clear(); + m_BlocksBasePath.clear(); +} + +void +BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, const WriteChunkCallback& Callback) +{ + ZEN_ASSERT(Data != nullptr); + ZEN_ASSERT(Size > 0u); + ZEN_ASSERT(Size <= m_MaxBlockSize); + ZEN_ASSERT(Alignment > 0u); + + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + bool IsWriting = !!m_WriteBlock; + if (!IsWriting || (m_CurrentInsertOffset + Size) > m_MaxBlockSize) + { + if (m_WriteBlock) + { + m_WriteBlock = nullptr; + } + + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); + } + + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + } + + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + + Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); + NewBlockFile->Create(m_MaxBlockSize); + + m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; + m_WriteBlock = NewBlockFile; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } + uint64_t InsertOffset = m_CurrentInsertOffset; + m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); + uint64_t AlignedWriteSize = m_CurrentInsertOffset - InsertOffset; + Ref<BlockStoreFile> WriteBlock = m_WriteBlock; + m_ActiveWriteBlocks.push_back(WriteBlockIndex); + InsertLock.ReleaseNow(); + + WriteBlock->Write(Data, Size, InsertOffset); + m_TotalSize.fetch_add(AlignedWriteSize, std::memory_order::relaxed); + + Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size}); + + { + RwLock::ExclusiveLockScope _(m_InsertLock); + m_ActiveWriteBlocks.erase(std::find(m_ActiveWriteBlocks.begin(), m_ActiveWriteBlocks.end(), WriteBlockIndex)); + } +} + +BlockStore::ReclaimSnapshotState +BlockStore::GetReclaimSnapshotState() +{ + ReclaimSnapshotState State; + RwLock::SharedLockScope _(m_InsertLock); + for (uint32_t BlockIndex : m_ActiveWriteBlocks) + { + State.m_ActiveWriteBlocks.insert(BlockIndex); + } + if (m_WriteBlock) + { + State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex); + } + State.BlockCount = m_ChunkBlocks.size(); + return State; +} + +IoBuffer +BlockStore::TryGetChunk(const BlockStoreLocation& Location) const +{ + RwLock::SharedLockScope InsertLock(m_InsertLock); + if (auto BlockIt = m_ChunkBlocks.find(Location.BlockIndex); BlockIt != m_ChunkBlocks.end()) + { + if (const Ref<BlockStoreFile>& Block = BlockIt->second; Block) + { + return Block->GetChunk(Location.Offset, Location.Size); + } + } + return IoBuffer(); +} + +void +BlockStore::Flush() +{ + RwLock::ExclusiveLockScope _(m_InsertLock); + if (m_CurrentInsertOffset > 0) + { + uint32_t WriteBlockIndex = m_WriteBlockIndex.load(std::memory_order_acquire); + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + m_WriteBlock = nullptr; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + m_CurrentInsertOffset = 0; + } +} + +void +BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot, + const std::vector<BlockStoreLocation>& ChunkLocations, + const ChunkIndexArray& KeepChunkIndexes, + uint64_t PayloadAlignment, + bool DryRun, + const ReclaimCallback& ChangeCallback, + const ClaimDiskReserveCallback& DiskReserveCallback) +{ + if (ChunkLocations.empty()) + { + return; + } + uint64_t WriteBlockTimeUs = 0; + uint64_t WriteBlockLongestTimeUs = 0; + uint64_t ReadBlockTimeUs = 0; + uint64_t ReadBlockLongestTimeUs = 0; + uint64_t TotalChunkCount = ChunkLocations.size(); + uint64_t DeletedSize = 0; + uint64_t OldTotalSize = 0; + uint64_t NewTotalSize = 0; + + uint64_t MovedCount = 0; + uint64_t DeletedCount = 0; + + Stopwatch TotalTimer; + const auto _ = MakeGuard([&] { + ZEN_DEBUG( + "reclaim space for '{}' DONE after {}, write lock: {} ({}), read lock: {} ({}), collected {} bytes, deleted {} and moved " + "{} " + "of {} " + "chunks ({}).", + m_BlocksBasePath, + NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()), + NiceLatencyNs(WriteBlockTimeUs), + NiceLatencyNs(WriteBlockLongestTimeUs), + NiceLatencyNs(ReadBlockTimeUs), + NiceLatencyNs(ReadBlockLongestTimeUs), + NiceBytes(DeletedSize), + DeletedCount, + MovedCount, + TotalChunkCount, + NiceBytes(OldTotalSize)); + }); + + size_t BlockCount = Snapshot.BlockCount; + if (BlockCount == 0) + { + ZEN_DEBUG("garbage collect for '{}' SKIPPED, no blocks to process", m_BlocksBasePath); + return; + } + + std::unordered_set<size_t> KeepChunkMap; + KeepChunkMap.reserve(KeepChunkIndexes.size()); + for (size_t KeepChunkIndex : KeepChunkIndexes) + { + KeepChunkMap.insert(KeepChunkIndex); + } + + std::unordered_map<uint32_t, size_t> BlockIndexToChunkMapIndex; + std::vector<ChunkIndexArray> BlockKeepChunks; + std::vector<ChunkIndexArray> BlockDeleteChunks; + + BlockIndexToChunkMapIndex.reserve(BlockCount); + BlockKeepChunks.reserve(BlockCount); + BlockDeleteChunks.reserve(BlockCount); + size_t GuesstimateCountPerBlock = TotalChunkCount / BlockCount / 2; + + size_t DeleteCount = 0; + for (size_t Index = 0; Index < TotalChunkCount; ++Index) + { + const BlockStoreLocation& Location = ChunkLocations[Index]; + OldTotalSize += Location.Size; + if (Snapshot.m_ActiveWriteBlocks.contains(Location.BlockIndex)) + { + continue; + } + + auto BlockIndexPtr = BlockIndexToChunkMapIndex.find(Location.BlockIndex); + size_t ChunkMapIndex = 0; + if (BlockIndexPtr == BlockIndexToChunkMapIndex.end()) + { + ChunkMapIndex = BlockKeepChunks.size(); + BlockIndexToChunkMapIndex[Location.BlockIndex] = ChunkMapIndex; + BlockKeepChunks.resize(ChunkMapIndex + 1); + BlockKeepChunks.back().reserve(GuesstimateCountPerBlock); + BlockDeleteChunks.resize(ChunkMapIndex + 1); + BlockDeleteChunks.back().reserve(GuesstimateCountPerBlock); + } + else + { + ChunkMapIndex = BlockIndexPtr->second; + } + + if (KeepChunkMap.contains(Index)) + { + ChunkIndexArray& IndexMap = BlockKeepChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + NewTotalSize += Location.Size; + continue; + } + ChunkIndexArray& IndexMap = BlockDeleteChunks[ChunkMapIndex]; + IndexMap.push_back(Index); + DeleteCount++; + } + + std::unordered_set<uint32_t> BlocksToReWrite; + BlocksToReWrite.reserve(BlockIndexToChunkMapIndex.size()); + for (const auto& Entry : BlockIndexToChunkMapIndex) + { + uint32_t BlockIndex = Entry.first; + size_t ChunkMapIndex = Entry.second; + const ChunkIndexArray& ChunkMap = BlockDeleteChunks[ChunkMapIndex]; + if (ChunkMap.empty()) + { + continue; + } + BlocksToReWrite.insert(BlockIndex); + } + + if (DryRun) + { + ZEN_DEBUG("garbage collect for '{}' DISABLED, found {} {} chunks of total {} {}", + m_BlocksBasePath, + DeleteCount, + NiceBytes(OldTotalSize - NewTotalSize), + TotalChunkCount, + OldTotalSize); + return; + } + + Ref<BlockStoreFile> NewBlockFile; + try + { + uint64_t WriteOffset = 0; + uint32_t NewBlockIndex = 0; + for (uint32_t BlockIndex : BlocksToReWrite) + { + const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex]; + + Ref<BlockStoreFile> OldBlockFile; + { + RwLock::SharedLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + WriteBlockTimeUs += ElapsedUs; + WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); + }); + OldBlockFile = m_ChunkBlocks[BlockIndex]; + } + + if (!OldBlockFile) + { + // If the block file pointed to does not exist, move them all to deleted list + BlockDeleteChunks[ChunkMapIndex].insert(BlockDeleteChunks[ChunkMapIndex].end(), + BlockKeepChunks[ChunkMapIndex].begin(), + BlockKeepChunks[ChunkMapIndex].end()); + BlockKeepChunks[ChunkMapIndex].clear(); + } + + const ChunkIndexArray& KeepMap = BlockKeepChunks[ChunkMapIndex]; + if (KeepMap.empty()) + { + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + ChangeCallback({}, DeleteMap); + DeletedCount += DeleteMap.size(); + { + RwLock::ExclusiveLockScope _i(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (OldBlockFile) + { + m_ChunkBlocks[BlockIndex] = nullptr; + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); + OldBlockFile->MarkAsDeleteOnClose(); + } + } + continue; + } + + ZEN_ASSERT(OldBlockFile); + + MovedChunksArray MovedChunks; + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepMap) + { + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + + if (!NewBlockFile || (WriteOffset + Chunk.size() > m_MaxBlockSize)) + { + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); + + if (NewBlockFile) + { + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + { + ChangeCallback(MovedChunks, {}); + MovedCount += KeepMap.size(); + MovedChunks.clear(); + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + if (m_ChunkBlocks.size() == m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return; + } + while (m_ChunkBlocks.contains(NextBlockIndex)) + { + NextBlockIndex = (NextBlockIndex + 1) & (m_MaxBlockCount - 1); + } + std::filesystem::path NewBlockPath = GetBlockPath(m_BlocksBasePath, NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; + } + + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) + { + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); + return; + } + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + RwLock::ExclusiveLockScope _l(m_InsertLock); + Stopwatch Timer; + const auto __ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks.erase(NextBlockIndex); + return; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; + } + + NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset); + MovedChunks.push_back({ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = WriteOffset, .Size = Chunk.size()}}); + uint64_t OldOffset = WriteOffset; + WriteOffset = RoundUp(WriteOffset + Chunk.size(), PayloadAlignment); + m_TotalSize.fetch_add(WriteOffset - OldOffset, std::memory_order::relaxed); + } + Chunk.clear(); + if (NewBlockFile) + { + NewBlockFile->Flush(); + NewBlockFile = nullptr; + } + + const ChunkIndexArray& DeleteMap = BlockDeleteChunks[ChunkMapIndex]; + for (size_t DeleteIndex : DeleteMap) + { + DeletedSize += ChunkLocations[DeleteIndex].Size; + } + + ChangeCallback(MovedChunks, DeleteMap); + MovedCount += KeepMap.size(); + DeletedCount += DeleteMap.size(); + MovedChunks.clear(); + { + RwLock::ExclusiveLockScope __(m_InsertLock); + Stopwatch Timer; + const auto ___ = MakeGuard([&] { + uint64_t ElapsedUs = Timer.GetElapsedTimeUs(); + ReadBlockTimeUs += ElapsedUs; + ReadBlockLongestTimeUs = std::max(ElapsedUs, ReadBlockLongestTimeUs); + }); + m_ChunkBlocks[BlockIndex] = nullptr; + ZEN_DEBUG("marking cas block store file '{}' for delete, block #{}", OldBlockFile->GetPath(), BlockIndex); + m_TotalSize.fetch_sub(OldBlockFile->FileSize(), std::memory_order::relaxed); + OldBlockFile->MarkAsDeleteOnClose(); + } + } + } + catch (std::exception& ex) + { + ZEN_ERROR("reclaiming space for '{}' failed with: '{}'", m_BlocksBasePath, ex.what()); + if (NewBlockFile) + { + ZEN_DEBUG("dropping incomplete cas block store file '{}'", NewBlockFile->GetPath()); + m_TotalSize.fetch_sub(NewBlockFile->FileSize(), std::memory_order::relaxed); + NewBlockFile->MarkAsDeleteOnClose(); + } + } +} + +void +BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, + const IterateChunksSmallSizeCallback& SmallSizeCallback, + const IterateChunksLargeSizeCallback& LargeSizeCallback) +{ + std::vector<size_t> LocationIndexes; + LocationIndexes.reserve(ChunkLocations.size()); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkLocations.size(); ++ChunkIndex) + { + LocationIndexes.push_back(ChunkIndex); + } + std::sort(LocationIndexes.begin(), LocationIndexes.end(), [&](size_t IndexA, size_t IndexB) -> bool { + const BlockStoreLocation& LocationA = ChunkLocations[IndexA]; + const BlockStoreLocation& LocationB = ChunkLocations[IndexB]; + if (LocationA.BlockIndex < LocationB.BlockIndex) + { + return true; + } + else if (LocationA.BlockIndex > LocationB.BlockIndex) + { + return false; + } + return LocationA.Offset < LocationB.Offset; + }); + + IoBuffer ReadBuffer{ScrubSmallChunkWindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + RwLock::SharedLockScope _(m_InsertLock); + + auto GetNextRange = [&](size_t StartIndexOffset) { + size_t ChunkCount = 0; + size_t StartIndex = LocationIndexes[StartIndexOffset]; + const BlockStoreLocation& StartLocation = ChunkLocations[StartIndex]; + uint64_t StartOffset = StartLocation.Offset; + while (StartIndexOffset + ChunkCount < LocationIndexes.size()) + { + size_t NextIndex = LocationIndexes[StartIndexOffset + ChunkCount]; + const BlockStoreLocation& Location = ChunkLocations[NextIndex]; + if (Location.BlockIndex != StartLocation.BlockIndex) + { + break; + } + if ((Location.Offset + Location.Size) - StartOffset > ScrubSmallChunkWindowSize) + { + break; + } + ++ChunkCount; + } + return ChunkCount; + }; + + size_t LocationIndexOffset = 0; + while (LocationIndexOffset < LocationIndexes.size()) + { + size_t ChunkIndex = LocationIndexes[LocationIndexOffset]; + const BlockStoreLocation& FirstLocation = ChunkLocations[ChunkIndex]; + + const Ref<BlockStoreFile>& BlockFile = m_ChunkBlocks[FirstLocation.BlockIndex]; + if (!BlockFile) + { + while (ChunkLocations[ChunkIndex].BlockIndex == FirstLocation.BlockIndex) + { + SmallSizeCallback(ChunkIndex, nullptr, 0); + LocationIndexOffset++; + if (LocationIndexOffset == LocationIndexes.size()) + { + break; + } + ChunkIndex = LocationIndexes[LocationIndexOffset]; + } + continue; + } + size_t BlockSize = BlockFile->FileSize(); + size_t RangeCount = GetNextRange(LocationIndexOffset); + if (RangeCount > 0) + { + size_t LastChunkIndex = LocationIndexes[LocationIndexOffset + RangeCount - 1]; + const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; + uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + BlockFile->Read(BufferBase, Size, FirstLocation.Offset); + for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) + { + size_t NextChunkIndex = LocationIndexes[LocationIndexOffset + RangeIndex]; + const BlockStoreLocation& ChunkLocation = ChunkLocations[NextChunkIndex]; + if (ChunkLocation.Size == 0 || (ChunkLocation.Offset + ChunkLocation.Size > BlockSize)) + { + SmallSizeCallback(NextChunkIndex, nullptr, 0); + continue; + } + void* BufferPtr = &((char*)BufferBase)[ChunkLocation.Offset - FirstLocation.Offset]; + SmallSizeCallback(NextChunkIndex, BufferPtr, ChunkLocation.Size); + } + LocationIndexOffset += RangeCount; + continue; + } + if (FirstLocation.Size == 0 || (FirstLocation.Offset + FirstLocation.Size > BlockSize)) + { + SmallSizeCallback(ChunkIndex, nullptr, 0); + LocationIndexOffset++; + continue; + } + LargeSizeCallback(ChunkIndex, *BlockFile.Get(), FirstLocation.Offset, FirstLocation.Size); + LocationIndexOffset++; + } +} + +const char* +BlockStore::GetBlockFileExtension() +{ + return ".ucas"; +} + +std::filesystem::path +BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex) +{ + ExtendablePathBuilder<256> Path; + + char BlockHexString[9]; + ToHexNumber(BlockIndex, BlockHexString); + + Path.Append(BlocksBasePath); + Path.AppendSeparator(); + Path.AppendAsciiRange(BlockHexString, BlockHexString + 4); + Path.AppendSeparator(); + Path.Append(BlockHexString); + Path.Append(GetBlockFileExtension()); + return Path.ToPath(); +} + +#if ZEN_WITH_TESTS + +TEST_CASE("blockstore.blockstoredisklocation") +{ + BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; + CHECK(Zero == BlockStoreDiskLocation(Zero, 4).Get(4)); + + BlockStoreLocation MaxBlockIndex = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0}; + CHECK(MaxBlockIndex == BlockStoreDiskLocation(MaxBlockIndex, 4).Get(4)); + + BlockStoreLocation MaxOffset = BlockStoreLocation{.BlockIndex = 0, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; + CHECK(MaxOffset == BlockStoreDiskLocation(MaxOffset, 4).Get(4)); + + BlockStoreLocation MaxSize = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxSize == BlockStoreDiskLocation(MaxSize, 4).Get(4)); + + BlockStoreLocation MaxBlockIndexAndOffset = + BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, .Offset = BlockStoreDiskLocation::MaxOffset * 4, .Size = 0}; + CHECK(MaxBlockIndexAndOffset == BlockStoreDiskLocation(MaxBlockIndexAndOffset, 4).Get(4)); + + BlockStoreLocation MaxAll = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, + .Offset = BlockStoreDiskLocation::MaxOffset * 4, + .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxAll == BlockStoreDiskLocation(MaxAll, 4).Get(4)); + + BlockStoreLocation MaxAll4096 = BlockStoreLocation{.BlockIndex = BlockStoreDiskLocation::MaxBlockIndex, + .Offset = BlockStoreDiskLocation::MaxOffset * 4096, + .Size = std::numeric_limits<uint32_t>::max()}; + CHECK(MaxAll4096 == BlockStoreDiskLocation(MaxAll4096, 4096).Get(4096)); + + BlockStoreLocation Middle = BlockStoreLocation{.BlockIndex = (BlockStoreDiskLocation::MaxBlockIndex) / 2, + .Offset = ((BlockStoreDiskLocation::MaxOffset) / 2) * 4, + .Size = std::numeric_limits<uint32_t>::max() / 2}; + CHECK(Middle == BlockStoreDiskLocation(Middle, 4).Get(4)); +} + +TEST_CASE("blockstore.blockfile") +{ + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path() / "blocks"; + CreateDirectories(RootDirectory); + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Create(16384); + CHECK(File1.FileSize() == 0); + File1.Write("data", 5, 0); + IoBuffer DataChunk = File1.GetChunk(0, 5); + File1.Write("boop", 5, 5); + IoBuffer BoopChunk = File1.GetChunk(5, 5); + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + File1.Flush(); + CHECK(File1.FileSize() == 10); + } + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + + char DataRaw[5]; + File1.Read(DataRaw, 5, 0); + CHECK(std::string(DataRaw) == "data"); + IoBuffer DataChunk = File1.GetChunk(0, 5); + + char BoopRaw[5]; + File1.Read(BoopRaw, 5, 5); + CHECK(std::string(BoopRaw) == "boop"); + + IoBuffer BoopChunk = File1.GetChunk(5, 5); + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + + { + IoBuffer DataChunk; + IoBuffer BoopChunk; + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + DataChunk = File1.GetChunk(0, 5); + BoopChunk = File1.GetChunk(5, 5); + } + + CHECK(std::filesystem::exists(RootDirectory / "1")); + + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + CHECK(std::filesystem::exists(RootDirectory / "1")); + + { + IoBuffer DataChunk; + IoBuffer BoopChunk; + + { + BlockStoreFile File1(RootDirectory / "1"); + File1.Open(); + File1.MarkAsDeleteOnClose(); + DataChunk = File1.GetChunk(0, 5); + BoopChunk = File1.GetChunk(5, 5); + } + + const char* Data = static_cast<const char*>(DataChunk.GetData()); + CHECK(std::string(Data) == "data"); + const char* Boop = static_cast<const char*>(BoopChunk.GetData()); + CHECK(std::string(Boop) == "boop"); + } + CHECK(!std::filesystem::exists(RootDirectory / "1")); +} + +namespace blockstore::impl { + BlockStoreLocation WriteStringAsChunk(BlockStore& Store, std::string_view String, size_t PayloadAlignment) + { + BlockStoreLocation Location; + Store.WriteChunk(String.data(), String.length(), PayloadAlignment, [&](const BlockStoreLocation& L) { Location = L; }); + CHECK(Location.Size == String.length()); + return Location; + }; + + std::string ReadChunkAsString(BlockStore& Store, const BlockStoreLocation& Location) + { + IoBuffer ChunkData = Store.TryGetChunk(Location); + if (!ChunkData) + { + return ""; + } + std::string AsString((const char*)ChunkData.Data(), ChunkData.Size()); + return AsString; + }; + + std::vector<std::filesystem::path> GetDirectoryContent(std::filesystem::path RootDir, bool Files, bool Directories) + { + DirectoryContent DirectoryContent; + GetDirectoryContent(RootDir, + DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) | + (Directories ? DirectoryContent::IncludeDirsFlag : 0), + DirectoryContent); + std::vector<std::filesystem::path> Result; + Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end()); + Result.insert(Result.end(), DirectoryContent.Files.begin(), DirectoryContent.Files.end()); + return Result; + }; + + static IoBuffer CreateChunk(uint64_t Size) + { + static std::random_device rd; + static std::mt19937 g(rd()); + + std::vector<uint8_t> Values; + Values.resize(Size); + for (size_t Idx = 0; Idx < Size; ++Idx) + { + Values[Idx] = static_cast<uint8_t>(Idx); + } + std::shuffle(Values.begin(), Values.end(), g); + + return IoBufferBuilder::MakeCloneFromMemory(Values.data(), Values.size()); + } +} // namespace blockstore::impl + +TEST_CASE("blockstore.chunks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory, 128, 1024, {}); + IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + BlockStoreLocation ThirdChunkLocation = WriteStringAsChunk(Store, ThirdChunkData, 4); + CHECK(ThirdChunkLocation.BlockIndex != FirstChunkLocation.BlockIndex); + + CHECK(ReadChunkAsString(Store, FirstChunkLocation) == FirstChunkData); + CHECK(ReadChunkAsString(Store, SecondChunkLocation) == SecondChunkData); + CHECK(ReadChunkAsString(Store, ThirdChunkLocation) == ThirdChunkData); +} + +TEST_CASE("blockstore.clean.stray.blocks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + Store.Close(); + + // Not referencing the second block means that we should be deleted + Store.Initialize(RootDirectory / "store", 128, 1024, {FirstChunkLocation, SecondChunkLocation}); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 1); +} + +TEST_CASE("blockstore.flush.forces.new.block") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 128, 1024, {}); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + WriteStringAsChunk(Store, FirstChunkData, 4); + Store.Flush(); + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + std::string ThirdChunkData = + "This is a much longer string that will not fit in the first block so it should be placed in the second block"; + WriteStringAsChunk(Store, ThirdChunkData, 4); + + CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 3); +} + +TEST_CASE("blockstore.iterate.chunks") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", ScrubSmallChunkWindowSize * 2, 1024, {}); + IoBuffer BadChunk = Store.TryGetChunk({.BlockIndex = 0, .Offset = 0, .Size = 512}); + CHECK(!BadChunk); + + std::string FirstChunkData = "This is the data of the first chunk that we will write"; + BlockStoreLocation FirstChunkLocation = WriteStringAsChunk(Store, FirstChunkData, 4); + + std::string SecondChunkData = "This is the data for the second chunk that we will write"; + BlockStoreLocation SecondChunkLocation = WriteStringAsChunk(Store, SecondChunkData, 4); + Store.Flush(); + + std::string VeryLargeChunk(ScrubSmallChunkWindowSize * 2, 'L'); + BlockStoreLocation VeryLargeChunkLocation = WriteStringAsChunk(Store, VeryLargeChunk, 4); + + BlockStoreLocation BadLocationZeroSize = {.BlockIndex = 0, .Offset = 0, .Size = 0}; + BlockStoreLocation BadLocationOutOfRange = {.BlockIndex = 0, + .Offset = ScrubSmallChunkWindowSize, + .Size = ScrubSmallChunkWindowSize * 2}; + BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; + + Store.IterateChunks( + {FirstChunkLocation, SecondChunkLocation, VeryLargeChunkLocation, BadLocationZeroSize, BadLocationOutOfRange, BadBlockIndex}, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + switch (ChunkIndex) + { + case 0: + CHECK(Data); + CHECK(Size == FirstChunkData.size()); + CHECK(std::string((const char*)Data, Size) == FirstChunkData); + break; + case 1: + CHECK(Data); + CHECK(Size == SecondChunkData.size()); + CHECK(std::string((const char*)Data, Size) == SecondChunkData); + break; + case 2: + CHECK(false); + break; + case 3: + CHECK(!Data); + break; + case 4: + CHECK(!Data); + break; + case 5: + CHECK(!Data); + break; + default: + CHECK(false); + break; + } + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + switch (ChunkIndex) + { + case 0: + case 1: + CHECK(false); + break; + case 2: + { + CHECK(Size == VeryLargeChunk.size()); + char* Buffer = new char[Size]; + size_t HashOffset = 0; + File.StreamByteRange(Offset, Size, [&](const void* Data, uint64_t Size) { + memcpy(&Buffer[HashOffset], Data, Size); + HashOffset += Size; + }); + CHECK(memcmp(Buffer, VeryLargeChunk.data(), Size) == 0); + delete[] Buffer; + } + break; + case 3: + CHECK(false); + break; + case 4: + CHECK(false); + break; + case 5: + CHECK(false); + break; + default: + CHECK(false); + break; + } + }); +} + +TEST_CASE("blockstore.reclaim.space") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 512, 1024, {}); + + constexpr size_t ChunkCount = 200; + constexpr size_t Alignment = 8; + std::vector<BlockStoreLocation> ChunkLocations; + std::vector<IoHash> ChunkHashes; + ChunkLocations.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex); + + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations.push_back(L); }); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<size_t> ChunksToKeep; + ChunksToKeep.reserve(ChunkLocations.size()); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + Store.Flush(); + BlockStore::ReclaimSnapshotState State1 = Store.GetReclaimSnapshotState(); + Store.ReclaimSpace(State1, ChunkLocations, ChunksToKeep, Alignment, true); + + // If we keep all the chunks we should not get any callbacks on moved/deleted stuff + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [](const BlockStore::MovedChunksArray&, const BlockStore::ChunkIndexArray&) { CHECK(false); }, + []() { + CHECK(false); + return 0; + }); + + size_t DeleteChunkCount = 38; + ChunksToKeep.clear(); + for (size_t ChunkIndex = DeleteChunkCount; ChunkIndex < ChunkCount; ++ChunkIndex) + { + ChunksToKeep.push_back(ChunkIndex); + } + + std::vector<BlockStoreLocation> NewChunkLocations = ChunkLocations; + size_t MovedChunkCount = 0; + size_t DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + ChunksToKeep, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + for (const auto& MovedChunk : MovedChunks) + { + CHECK(MovedChunk.first >= DeleteChunkCount); + NewChunkLocations[MovedChunk.first] = MovedChunk.second; + } + MovedChunkCount += MovedChunks.size(); + for (size_t DeletedIndex : DeletedChunks) + { + CHECK(DeletedIndex < DeleteChunkCount); + } + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(MovedChunkCount <= DeleteChunkCount); + CHECK(DeletedChunkCount == DeleteChunkCount); + ChunkLocations = std::vector<BlockStoreLocation>(NewChunkLocations.begin() + DeleteChunkCount, NewChunkLocations.end()); + + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer ChunkBlock = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); + if (ChunkIndex >= DeleteChunkCount) + { + IoBuffer VerifyChunk = Store.TryGetChunk(NewChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + } + } + + NewChunkLocations = ChunkLocations; + MovedChunkCount = 0; + DeletedChunkCount = 0; + Store.ReclaimSpace( + State1, + ChunkLocations, + {}, + Alignment, + false, + [&](const BlockStore::MovedChunksArray& MovedChunks, const BlockStore::ChunkIndexArray& DeletedChunks) { + CHECK(MovedChunks.empty()); + DeletedChunkCount += DeletedChunks.size(); + }, + []() { + CHECK(false); + return 0; + }); + CHECK(DeletedChunkCount == ChunkCount - DeleteChunkCount); +} + +TEST_CASE("blockstore.thread.read.write") +{ + using namespace blockstore::impl; + + ScopedTemporaryDirectory TempDir; + auto RootDirectory = TempDir.Path(); + + BlockStore Store; + Store.Initialize(RootDirectory / "store", 1088, 1024, {}); + + constexpr size_t ChunkCount = 1000; + constexpr size_t Alignment = 8; + std::vector<IoBuffer> Chunks; + std::vector<IoHash> ChunkHashes; + Chunks.reserve(ChunkCount); + ChunkHashes.reserve(ChunkCount); + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + IoBuffer Chunk = CreateChunk(57 + ChunkIndex / 2); + Chunks.push_back(Chunk); + ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size())); + } + + std::vector<BlockStoreLocation> ChunkLocations; + ChunkLocations.resize(ChunkCount); + + WorkerThreadPool WorkerPool(8); + std::atomic<size_t> WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &ChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { ChunkLocations[ChunkIndex] = L; }); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size()) + { + Sleep(1); + } + + std::vector<BlockStoreLocation> SecondChunkLocations; + SecondChunkLocations.resize(ChunkCount); + WorkCompleted = 0; + for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + WorkerPool.ScheduleWork([&Store, ChunkIndex, &Chunks, &SecondChunkLocations, &WorkCompleted]() { + IoBuffer& Chunk = Chunks[ChunkIndex]; + Store.WriteChunk(Chunk.Data(), Chunk.Size(), Alignment, [&](const BlockStoreLocation& L) { + SecondChunkLocations[ChunkIndex] = L; + }); + WorkCompleted.fetch_add(1); + }); + WorkerPool.ScheduleWork([&Store, ChunkIndex, &ChunkLocations, &ChunkHashes, &WorkCompleted]() { + IoBuffer VerifyChunk = Store.TryGetChunk(ChunkLocations[ChunkIndex]); + CHECK(VerifyChunk); + IoHash VerifyHash = IoHash::HashBuffer(VerifyChunk.Data(), VerifyChunk.Size()); + CHECK(VerifyHash == ChunkHashes[ChunkIndex]); + WorkCompleted.fetch_add(1); + }); + } + while (WorkCompleted < Chunks.size() * 2) + { + Sleep(1); + } +} + +#endif + +void +blockstore_forcelink() +{ +} + +} // namespace zen |