aboutsummaryrefslogtreecommitdiff
path: root/zenstore/compactcas.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-23 11:23:24 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:29:27 +0200
commit4dd1ae392ff26a7ba27f1cf8f0bb3fcbb4a04d73 (patch)
tree75fac9f50f2d6f64fc435e62913b3d494f739972 /zenstore/compactcas.cpp
parentAdd separate blockstore.h/.cpp (diff)
downloadzen-4dd1ae392ff26a7ba27f1cf8f0bb3fcbb4a04d73.tar.xz
zen-4dd1ae392ff26a7ba27f1cf8f0bb3fcbb4a04d73.zip
Use blockstore in compactcas
Diffstat (limited to 'zenstore/compactcas.cpp')
-rw-r--r--zenstore/compactcas.cpp272
1 files changed, 45 insertions, 227 deletions
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 1f49dd7c2..fbdac491e 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -58,148 +58,6 @@ namespace {
} // namespace
-struct CasContainerStrategy::ChunkBlock
-{
- explicit ChunkBlock(const std::filesystem::path& BlockPath);
- ~ChunkBlock();
- const std::filesystem::path& GetPath() const;
- void Open();
- void Create(uint64_t InitialSize);
- void MarkAsDeleteOnClose(std::error_code& Ec);
- uint64_t FileSize();
- IoBuffer GetChunk(uint64_t Offset, uint64_t Size);
- void Read(void* Data, uint64_t Size, uint64_t FileOffset);
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Flush();
- void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
-
-private:
- void InternalOpen();
- const std::filesystem::path m_Path;
- RwLock m_OpenLock;
- BasicFile m_File;
- IoBuffer m_IoBuffer;
-};
-
-CasContainerStrategy::ChunkBlock::ChunkBlock(const std::filesystem::path& BlockPath) : m_Path(BlockPath)
-{
-}
-
-CasContainerStrategy::ChunkBlock::~ChunkBlock()
-{
- RwLock::ExclusiveLockScope _(m_OpenLock);
- m_File.Detach();
-}
-
-const std::filesystem::path&
-CasContainerStrategy::ChunkBlock::GetPath() const
-{
- return m_Path;
-}
-
-void
-CasContainerStrategy::ChunkBlock::InternalOpen()
-{
- if (m_File.Handle())
- {
- return;
- }
- m_File.Open(m_Path, false);
- void* FileHandle = m_File.Handle();
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize());
-}
-
-void
-CasContainerStrategy::ChunkBlock::Open()
-{
- RwLock::ExclusiveLockScope _(m_OpenLock);
- InternalOpen();
-}
-
-void
-CasContainerStrategy::ChunkBlock::Create(uint64_t InitialSize)
-{
- RwLock::ExclusiveLockScope _(m_OpenLock);
-
- auto ParentPath = m_Path.parent_path();
- if (!std::filesystem::is_directory(ParentPath))
- {
- CreateDirectories(ParentPath);
- }
-
- m_File.Open(m_Path, true);
- if (InitialSize > 0)
- {
- m_File.SetFileSize(InitialSize);
- }
- void* FileHandle = m_File.Handle();
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
-}
-
-uint64_t
-CasContainerStrategy::ChunkBlock::FileSize()
-{
- RwLock::SharedLockScope _(m_OpenLock);
- return m_File.FileSize();
-}
-
-void
-CasContainerStrategy::ChunkBlock::MarkAsDeleteOnClose(std::error_code& Ec)
-{
- RwLock::ExclusiveLockScope _(m_OpenLock);
- if (m_File.Handle())
- {
- m_File.MarkAsDeleteOnClose(Ec);
- return;
- }
- if (std::filesystem::is_regular_file(m_Path))
- {
- Ec.clear();
- std::filesystem::remove(m_Path, Ec);
- }
-}
-
-IoBuffer
-CasContainerStrategy::ChunkBlock::GetChunk(uint64_t Offset, uint64_t Size)
-{
- InternalOpen();
- return IoBuffer(m_IoBuffer, Offset, Size);
-}
-
-void
-CasContainerStrategy::ChunkBlock::Read(void* Data, uint64_t Size, uint64_t FileOffset)
-{
- RwLock::SharedLockScope _(m_OpenLock);
- m_File.Read(Data, Size, FileOffset);
-}
-
-void
-CasContainerStrategy::ChunkBlock::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
-{
- RwLock::SharedLockScope _(m_OpenLock);
- m_File.Write(Data, Size, FileOffset);
-}
-
-void
-CasContainerStrategy::ChunkBlock::Flush()
-{
- RwLock::ExclusiveLockScope _(m_OpenLock);
- if (!m_File.Handle())
- {
- return;
- }
- m_File.Flush();
-}
-
-void
-CasContainerStrategy::ChunkBlock::StreamByteRange(uint64_t FileOffset,
- uint64_t Size,
- std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
-{
- RwLock::SharedLockScope _(m_OpenLock);
- m_File.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
-}
-
//////////////////////////////////////////////////////////////////////////
CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
@@ -233,9 +91,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint3
CasStore::InsertResult
CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash)
{
- uint32_t WriteBlockIndex;
- std::shared_ptr<ChunkBlock> WriteBlock;
- uint64_t InsertOffset;
+ uint32_t WriteBlockIndex;
+ std::shared_ptr<BlockStoreFile> WriteBlock;
+ uint64_t InsertOffset;
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
@@ -261,17 +119,17 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
}
{
RwLock::ExclusiveLockScope __(m_LocationMapLock);
- if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
{
throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
}
WriteBlockIndex += WriteBlock ? 1 : 0;
while (m_ChunkBlocks.contains(WriteBlockIndex))
{
- WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex;
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
}
auto BlockPath = BuildUcasPath(m_BlocksBasePath, WriteBlockIndex);
- WriteBlock = std::make_shared<ChunkBlock>(BlockPath);
+ WriteBlock = std::make_shared<BlockStoreFile>(BlockPath);
m_ChunkBlocks[WriteBlockIndex] = WriteBlock;
m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
}
@@ -289,13 +147,13 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
WriteBlock->Write(ChunkData, ChunkSize, InsertOffset);
- const CasLocation Location(WriteBlockIndex, InsertOffset, ChunkSize);
- CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = CasDiskLocation(Location, m_PayloadAlignment)};
+ const BlockStoreLocation Location(WriteBlockIndex, InsertOffset, ChunkSize);
+ CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)};
m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize));
{
RwLock::ExclusiveLockScope __(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, CasDiskLocation(Location, m_PayloadAlignment));
+ m_LocationMap.emplace(ChunkHash, BlockStoreDiskLocation(Location, m_PayloadAlignment));
m_CasLog.Append(IndexEntry);
}
@@ -311,8 +169,8 @@ CasContainerStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBuffer
CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
{
- std::shared_ptr<ChunkBlock> ChunkBlock;
- CasLocation Location;
+ std::shared_ptr<BlockStoreFile> ChunkBlock;
+ BlockStoreLocation Location;
{
RwLock::SharedLockScope _(m_LocationMapLock);
auto KeyIt = m_LocationMap.find(ChunkHash);
@@ -363,7 +221,7 @@ CasContainerStrategy::Flush()
WriteBlockIndex++;
while (m_ChunkBlocks.contains(WriteBlockIndex))
{
- WriteBlockIndex = (WriteBlockIndex + 1) & CasDiskLocation::MaxBlockIndex;
+ WriteBlockIndex = (WriteBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
}
WriteBlock->Flush();
m_WriteBlock.reset();
@@ -409,8 +267,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (auto& Entry : m_LocationMap)
{
- const CasLocation Location = Entry.second.Get(m_PayloadAlignment);
- const uint64_t EntryOffset = Location.Offset;
+ const BlockStoreLocation Location = Entry.second.Get(m_PayloadAlignment);
+ const uint64_t EntryOffset = Location.Offset;
if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
@@ -443,9 +301,9 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
for (const CasDiskIndexEntry& Entry : BigChunks)
{
- IoHashStream Hasher;
- const CasLocation Location = Entry.Location.Get(m_PayloadAlignment);
- auto& BlockFile = *m_ChunkBlocks[Location.BlockIndex];
+ IoHashStream Hasher;
+ const BlockStoreLocation Location = Entry.Location.Get(m_PayloadAlignment);
+ auto& BlockFile = *m_ChunkBlocks[Location.BlockIndex];
BlockFile.StreamByteRange(Location.Offset, Location.Size, [&](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
IoHash ComputedHash = Hasher.GetHash();
@@ -516,8 +374,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// path to the next new block.
ZEN_INFO("collecting garbage from '{}'", m_Config.RootDirectory / m_ContainerBaseName);
- std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> LocationMap;
- size_t BlockCount;
+ std::unordered_map<IoHash, BlockStoreDiskLocation, IoHash::Hasher> LocationMap;
+ size_t BlockCount;
{
RwLock::SharedLockScope _i(m_InsertLock);
RwLock::SharedLockScope _l(m_LocationMapLock);
@@ -609,22 +467,22 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// Move all chunks in blocks that have chunks removed to new blocks
- std::shared_ptr<ChunkBlock> NewBlockFile;
- uint64_t WriteOffset = {};
- uint32_t NewBlockIndex = {};
- std::vector<IoHash> DeletedChunks;
+ std::shared_ptr<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = {};
+ uint32_t NewBlockIndex = {};
+ std::vector<IoHash> DeletedChunks;
DeletedChunks.reserve(DeleteCount);
std::vector<IoHash> MovedChunks;
DeletedChunks.reserve(MoveCount);
- std::unordered_map<IoHash, CasDiskLocation> MovedBlockChunks;
+ std::unordered_map<IoHash, BlockStoreDiskLocation> MovedBlockChunks;
for (auto BlockIndex : BlocksToReWrite)
{
const size_t ChunkMapIndex = BlockIndexToChunkMapIndex[BlockIndex];
const auto& KeepMap = KeepChunks[ChunkMapIndex];
if (KeepMap.empty())
{
- std::shared_ptr<ChunkBlock> BlockFile;
+ std::shared_ptr<BlockStoreFile> BlockFile;
{
RwLock::ExclusiveLockScope _i(m_LocationMapLock);
const auto& DeleteMap = DeleteChunks[ChunkMapIndex];
@@ -649,7 +507,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
continue;
}
- std::shared_ptr<ChunkBlock> OldBlockFile;
+ std::shared_ptr<BlockStoreFile> OldBlockFile;
{
RwLock::SharedLockScope _i(m_LocationMapLock);
OldBlockFile = m_ChunkBlocks[BlockIndex];
@@ -660,8 +518,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
std::vector<uint8_t> Chunk;
for (const auto& ChunkHash : KeepMap)
{
- auto KeyIt = LocationMap.find(ChunkHash);
- const CasLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
+ auto KeyIt = LocationMap.find(ChunkHash);
+ const BlockStoreLocation ChunkLocation = KeyIt->second.Get(m_PayloadAlignment);
Chunk.resize(ChunkLocation.Size);
OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
@@ -678,7 +536,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
MovedChunks.push_back(MovedEntry.first);
}
- if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
{
ZEN_ERROR("unable to allocate a new block in {}, count limit {} exeeded",
m_ContainerBaseName,
@@ -686,16 +544,16 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
return;
}
}
- if (m_ChunkBlocks.size() == CasDiskLocation::MaxBlockIndex)
+ if (m_ChunkBlocks.size() == BlockStoreDiskLocation::MaxBlockIndex)
{
throw std::runtime_error(fmt::format("unable to allocate a new block in {}", m_ContainerBaseName));
}
while (m_ChunkBlocks.contains(NextBlockIndex))
{
- NextBlockIndex = (NextBlockIndex + 1) & CasDiskLocation::MaxBlockIndex;
+ NextBlockIndex = (NextBlockIndex + 1) & BlockStoreDiskLocation::MaxBlockIndex;
}
auto NewBlockPath = BuildUcasPath(m_BlocksBasePath, NextBlockIndex);
- NewBlockFile = std::make_shared<ChunkBlock>(NewBlockPath);
+ NewBlockFile = std::make_shared<BlockStoreFile>(NewBlockPath);
m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
}
@@ -738,8 +596,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size());
- MovedBlockChunks.emplace(ChunkHash, CasDiskLocation(NewChunkLocation, m_PayloadAlignment));
+ BlockStoreLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size());
+ MovedBlockChunks.emplace(ChunkHash, BlockStoreDiskLocation(NewChunkLocation, m_PayloadAlignment));
WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
}
Chunk.clear();
@@ -1065,12 +923,12 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
BlockFile.SetFileSize(MaxUsedSize);
uint64_t MaxRequiredChunkCount = MaxUsedSize / m_MaxBlockSize;
- if (MaxRequiredChunkCount > CasDiskLocation::MaxBlockIndex)
+ if (MaxRequiredChunkCount > BlockStoreDiskLocation::MaxBlockIndex)
{
ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
m_Config.RootDirectory / m_ContainerBaseName,
MaxRequiredChunkCount,
- CasDiskLocation::MaxBlockIndex);
+ BlockStoreDiskLocation::MaxBlockIndex);
return;
}
@@ -1091,9 +949,9 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
m_CasLog.Open(SlogPath, true);
- std::unique_ptr<ChunkBlock> NewBlockFile;
- uint64_t WriteOffset = {};
- uint32_t NewBlockIndex = {};
+ std::unique_ptr<BlockStoreFile> NewBlockFile;
+ uint64_t WriteOffset = {};
+ uint32_t NewBlockIndex = {};
std::vector<uint8_t> Chunk;
for (const auto& ChunkHash : ChunkHashes)
@@ -1105,7 +963,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
if (!NewBlockFile)
{
auto BlockPath = BuildUcasPath(m_BlocksBasePath, NewBlockIndex);
- NewBlockFile = std::make_unique<ChunkBlock>(BlockPath);
+ NewBlockFile = std::make_unique<BlockStoreFile>(BlockPath);
NewBlockFile->Create(m_MaxBlockSize);
}
else if (WriteOffset + Chunk.size() > m_MaxBlockSize)
@@ -1114,13 +972,13 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
BlockFile.SetFileSize(ChunkEnd);
NewBlockIndex = NewBlockIndex + 1;
auto BlockPath = BuildUcasPath(m_BlocksBasePath, NewBlockIndex);
- NewBlockFile = std::make_unique<ChunkBlock>(BlockPath);
+ NewBlockFile = std::make_unique<BlockStoreFile>(BlockPath);
NewBlockFile->Create(m_MaxBlockSize);
WriteOffset = 0;
}
NewBlockFile->Write(Chunk.data(), Chunk.size(), WriteOffset);
- CasLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size());
- m_CasLog.Append({.Key = ChunkHash, .Location = CasDiskLocation(NewChunkLocation, m_PayloadAlignment)});
+ BlockStoreLocation NewChunkLocation(NewBlockIndex, WriteOffset, Chunk.size());
+ m_CasLog.Append({.Key = ChunkHash, .Location = BlockStoreDiskLocation(NewChunkLocation, m_PayloadAlignment)});
WriteOffset = RoundUp(WriteOffset + Chunk.size(), m_PayloadAlignment);
}
m_CasLog.Close();
@@ -1219,7 +1077,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
continue;
}
auto BlockPath = BuildUcasPath(m_BlocksBasePath, BlockIndex);
- auto BlockFile = std::make_shared<ChunkBlock>(BlockPath);
+ auto BlockFile = std::make_shared<BlockStoreFile>(BlockPath);
m_ChunkBlocks[BlockIndex] = BlockFile;
}
}
@@ -1303,46 +1161,6 @@ namespace {
}
} // namespace
-bool
-operator==(const CasLocation& Lhs, const CasLocation& Rhs)
-{
- return Lhs.BlockIndex == Rhs.BlockIndex && Lhs.Offset == Rhs.Offset && Lhs.Size == Rhs.Size;
-}
-
-TEST_CASE("compactcas.casdisklocation")
-{
- CasLocation Zero = CasLocation{.BlockIndex = 0, .Offset = 0, .Size = 0};
- CHECK(Zero == CasDiskLocation(Zero, 4).Get(4));
-
- CasLocation MaxBlockIndex = CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex, .Offset = 0, .Size = 0};
- CHECK(MaxBlockIndex == CasDiskLocation(MaxBlockIndex, 4).Get(4));
-
- CasLocation MaxOffset = CasLocation{.BlockIndex = 0, .Offset = CasDiskLocation::MaxOffset * 4, .Size = 0};
- CHECK(MaxOffset == CasDiskLocation(MaxOffset, 4).Get(4));
-
- CasLocation MaxSize = CasLocation{.BlockIndex = 0, .Offset = 0, .Size = std::numeric_limits<uint32_t>::max()};
- CHECK(MaxSize == CasDiskLocation(MaxSize, 4).Get(4));
-
- CasLocation MaxBlockIndexAndOffset =
- CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex, .Offset = CasDiskLocation::MaxOffset * 4, .Size = 0};
- CHECK(MaxBlockIndexAndOffset == CasDiskLocation(MaxBlockIndexAndOffset, 4).Get(4));
-
- CasLocation MaxAll = CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex,
- .Offset = CasDiskLocation::MaxOffset * 4,
- .Size = std::numeric_limits<uint32_t>::max()};
- CHECK(MaxAll == CasDiskLocation(MaxAll, 4).Get(4));
-
- CasLocation MaxAll4096 = CasLocation{.BlockIndex = CasDiskLocation::MaxBlockIndex,
- .Offset = CasDiskLocation::MaxOffset * 4096,
- .Size = std::numeric_limits<uint32_t>::max()};
- CHECK(MaxAll4096 == CasDiskLocation(MaxAll4096, 4096).Get(4096));
-
- CasLocation Middle = CasLocation{.BlockIndex = (CasDiskLocation::MaxBlockIndex) / 2,
- .Offset = ((CasDiskLocation::MaxOffset) / 2) * 4,
- .Size = std::numeric_limits<uint32_t>::max() / 2};
- CHECK(Middle == CasDiskLocation(Middle, 4).Get(4));
-}
-
TEST_CASE("compactcas.hex")
{
uint32_t Value;
@@ -1990,7 +1808,7 @@ TEST_CASE("compactcas.legacyconversion")
LegacyCasLog.Open(SlogPath, true);
for (const auto& Entry : LogEntries)
{
- CasLocation Location = Entry.Location.Get(16);
+ BlockStoreLocation Location = Entry.Location.Get(16);
LegacyCasDiskLocation LegacyLocation(Location.Offset, Location.Size);
LegacyCasDiskIndexEntry LegacyEntry = {.Key = Entry.Key,
.Location = LegacyLocation,