aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-14 15:26:37 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:28:31 +0200
commitb9329e02b21a7bbffd32277c51ea0050e5856027 (patch)
tree1d7ec65e06c1e79ce46c2d7eee9afa7b7cde0301
parentWIP (diff)
downloadzen-b9329e02b21a7bbffd32277c51ea0050e5856027.tar.xz
zen-b9329e02b21a7bbffd32277c51ea0050e5856027.zip
Split chunkbundler into size-limited blocks
-rw-r--r--zenstore/chunkbundler.cpp209
-rw-r--r--zenstore/chunkbundler.h42
2 files changed, 150 insertions, 101 deletions
diff --git a/zenstore/chunkbundler.cpp b/zenstore/chunkbundler.cpp
index ca5066e33..87b71df09 100644
--- a/zenstore/chunkbundler.cpp
+++ b/zenstore/chunkbundler.cpp
@@ -50,18 +50,58 @@ ChunkBundler::~ChunkBundler()
}
void
-ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore)
+ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore)
{
ZEN_ASSERT(IsPow2(Alignment));
ZEN_ASSERT(!m_IsInitialized);
+ ZEN_ASSERT(MaxBlockSize > 0);
m_ContainerBaseName = ContainerBaseName;
m_PayloadAlignment = Alignment;
+ m_MaxBlockSize = MaxBlockSize;
std::filesystem::path SobsPath = m_RootDirectory / (m_ContainerBaseName + ".ucas");
std::filesystem::path SlogPath = m_RootDirectory / (m_ContainerBaseName + ".ulog");
- m_SmallObjectFile.Open(SobsPath, IsNewStore);
+ CreateDirectories(m_RootDirectory / "ucas");
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_RootDirectory / "ucas"))
+ {
+ if (Entry.is_regular_file())
+ {
+ if (IsNewStore)
+ {
+ std::filesystem::remove(Entry.path());
+ continue;
+ }
+ if (Entry.path().extension() == ".ucas")
+ {
+ try
+ {
+ std::string stem = Entry.path().stem().string();
+ uint16_t fileIndex = static_cast<uint16_t>(std::stoi(stem));
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(Entry.path(), false);
+ m_OpenBlocks[fileIndex] = SmallObjectFile;
+ m_CurrentFileIndex = std::max<uint16_t>(m_CurrentFileIndex, fileIndex);
+ }
+ catch(const std::invalid_argument&)
+ {
+ // Non-valid file, skip it
+ }
+ }
+ }
+ }
+ if (m_OpenBlocks.empty())
+ {
+ std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas");
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
+ }
+
+ m_CurrentBlock = m_OpenBlocks[m_CurrentFileIndex];
+ m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(m_CurrentBlock.lock()->FileSize(), m_PayloadAlignment));
m_OpLog.Open(SlogPath, IsNewStore);
// TODO: should validate integrity of container files here
@@ -98,16 +138,12 @@ ChunkBundler::Initialize(const std::string_view ContainerBaseName, uint64_t Alig
}
});
- uint64_t MaxFileOffset = 0;
for (const auto& Entry : m_LocationMap)
{
const auto& Location = Entry.second;
- MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Location.GetOffset() + Location.GetSize());
- m_TotalSize.fetch_add(Location.GetSize());
+ m_TotalSize.fetch_add(Location.Size);
}
- m_CurrentInsertOffset = AlignPositon(MaxFileOffset, m_PayloadAlignment);
-
m_IsInitialized = true;
}
@@ -128,11 +164,22 @@ ChunkBundler::InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash&
// New entry
- const uint64_t InsertOffset = m_CurrentInsertOffset;
- m_SmallObjectFile.Write(ChunkData, ChunkSize, InsertOffset);
- m_CurrentInsertOffset = AlignPositon(m_CurrentInsertOffset + ChunkSize, m_PayloadAlignment);
+ uint64_t CurrentBlockSize = m_CurrentBlock.lock()->FileSize();
+ if (CurrentBlockSize + m_CurrentInsertOffset > m_MaxBlockSize)
+ {
+ m_CurrentFileIndex++;
+ std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas");
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
+ }
+ const uint32_t InsertOffset = m_CurrentInsertOffset;
+ m_CurrentBlock.lock()->Write(ChunkData, ChunkSize, InsertOffset);
+ m_CurrentInsertOffset = static_cast<uint32_t>(AlignPositon(InsertOffset + ChunkSize, m_PayloadAlignment));
- const CompactDiskLocation Location{InsertOffset, ChunkSize};
+ const CompactDiskLocation Location{m_CurrentFileIndex, InsertOffset, static_cast<uint32_t>(ChunkSize)};
CompactDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = Location};
RwLock::ExclusiveLockScope __(m_LocationMapLock);
@@ -158,7 +205,10 @@ ChunkBundler::FindChunk(const IoHash& ChunkHash)
{
const CompactDiskLocation& Location = KeyIt->second;
- return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.GetOffset(), Location.GetSize());
+ if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end())
+ {
+ return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size);
+ }
}
// Not found
@@ -196,16 +246,13 @@ void
ChunkBundler::Flush()
{
m_OpLog.Flush();
- m_SmallObjectFile.Flush();
+ m_CurrentBlock.lock()->Flush();
}
void
ChunkBundler::Scrub(ScrubContext& Ctx)
{
const uint64_t WindowSize = 4 * 1024 * 1024;
- uint64_t WindowStart = 0;
- uint64_t WindowEnd = WindowSize;
- const uint64_t FileSize = m_SmallObjectFile.FileSize();
std::vector<CompactDiskIndexEntry> BigChunks;
std::vector<CompactDiskIndexEntry> BadChunks;
@@ -221,47 +268,57 @@ ChunkBundler::Scrub(ScrubContext& Ctx)
RwLock::SharedLockScope _(m_LocationMapLock);
- do
+ for (const auto& Block : m_OpenBlocks)
{
- const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
- m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart);
+ uint64_t WindowStart = 0;
+ uint64_t WindowEnd = WindowSize;
+ auto& SmallObjectFile = *Block.second;
+ const uint64_t FileSize = SmallObjectFile.FileSize();
- for (auto& Entry : m_LocationMap)
+ do
{
- const uint64_t EntryOffset = Entry.second.GetOffset();
+ const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart);
+ SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart);
- if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
+ for (auto& Entry : m_LocationMap)
{
- const uint64_t EntryEnd = EntryOffset + Entry.second.GetSize();
+ const uint64_t EntryOffset = Entry.second.Offset;
- if (EntryEnd >= WindowEnd)
+ if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd))
{
- BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
-
- continue;
- }
-
- if (m_Validator && !m_Validator->ValidateChunk(
- IoBuffer(IoBuffer::Wrap,
- reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.GetOffset() - WindowStart,
- Entry.second.GetSize()),
- Entry.first))
- {
- // Hash mismatch
- BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ const uint64_t EntryEnd = EntryOffset + Entry.second.Size;
+
+ if (EntryEnd >= WindowEnd)
+ {
+ BigChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+
+ continue;
+ }
+
+ if (m_Validator && !m_Validator->ValidateChunk(
+ IoBuffer(IoBuffer::Wrap,
+ reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart,
+ Entry.second.Size),
+ Entry.first))
+ {
+ // Hash mismatch
+ BadChunks.push_back({.Key = Entry.first, .Location = Entry.second});
+ }
}
}
- }
- WindowStart += WindowSize;
- WindowEnd += WindowSize;
- } while (WindowStart < FileSize);
-#if 0
+ WindowStart += WindowSize;
+ WindowEnd += WindowSize;
+ } while (WindowStart < FileSize);
+ }
+
// TODO: Figure out API for this
// Deal with large chunks
- for (const auto& Entry : BigChunks)
+#if 0
+ for (const auto& _ : BigChunks)
{
- IoHashStream Hasher;
+ auto& ___ = *m_OpenBlocks[Entry.Location.BlockIndex];
+ /* IoHashStream Hasher;
m_SmallObjectFile.StreamByteRange(Entry.Location.GetOffset(), Entry.Location.GetSize(), [&](const void* Data, uint64_t Size) {
Hasher.Append(Data, Size);
});
@@ -271,8 +328,9 @@ ChunkBundler::Scrub(ScrubContext& Ctx)
{
BadChunks.push_back(Entry);
}
+ */
}
-#endif
+#endif // 0
}
if (BadChunks.empty())
@@ -328,12 +386,16 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
// added betwen each move of a block.
ZEN_INFO("collecting garbage from '{}'", m_RootDirectory / m_ContainerBaseName);
+ std::vector<IoHash> DeletedChunks;
+ std::unordered_set<int> BlocksToReWrite;
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
RwLock::ExclusiveLockScope _l(m_LocationMapLock);
Flush();
+ BlocksToReWrite.reserve(m_OpenBlocks.size());
+
if (m_LocationMap.empty())
{
ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_RootDirectory / m_ContainerBaseName);
@@ -350,7 +412,6 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
TotalChunkHashes.push_back(Entry.first);
}
- std::vector<IoHash> DeletedChunks;
std::vector<IoHash> ChunkHashes; // Same sort order as ChunkLocations
ChunkHashes.reserve(m_LocationMap.size());
@@ -381,18 +442,18 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
std::sort(begin(ChunkHashes), end(ChunkHashes), [&](IoHash Lhs, IoHash Rhs) {
auto LhsKeyIt = m_LocationMap.find(Lhs);
auto RhsKeyIt = m_LocationMap.find(Rhs);
- return LhsKeyIt->second.GetOffset() < RhsKeyIt->second.GetOffset();
+ return LhsKeyIt->second.Offset < RhsKeyIt->second.Offset;
});
uint64_t NewTotalSize = 0;
std::vector<CompactDiskLocation> ChunkLocations;
ChunkLocations.reserve(ChunkHashes.size());
- for (auto Entry : ChunkHashes)
+ for (auto ChunkHash : ChunkHashes)
{
- auto KeyIt = m_LocationMap.find(Entry);
+ auto KeyIt = m_LocationMap.find(ChunkHash);
const auto& ChunkLocation = KeyIt->second;
ChunkLocations.push_back(ChunkLocation);
- NewTotalSize += ChunkLocation.GetSize();
+ NewTotalSize += ChunkLocation.Size;
}
if (!CollectSmallObjects)
@@ -406,20 +467,31 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
return;
}
- for (auto ChunkHash : reverse(DeletedChunks))
+ for (auto ChunkHash : DeletedChunks)
{
- auto KeyIt = m_LocationMap.find(ChunkHash);
- const auto& ChunkLocation = KeyIt->second;
- uint64_t NextChunkOffset = ChunkLocation.GetOffset() + ChunkLocation.GetSize();
+ auto KeyIt = m_LocationMap.find(ChunkHash);
+ const auto& ChunkLocation = KeyIt->second;
+ BlocksToReWrite.insert(ChunkLocation.BlockIndex);
m_OpLog.Append({.Key = ChunkHash, .Location = ChunkLocation, .Flags = CompactDiskIndexEntry::kTombstone});
m_LocationMap.erase(ChunkHash);
- if (m_CurrentInsertOffset == NextChunkOffset)
- {
- m_CurrentInsertOffset = ChunkLocation.GetOffset();
- }
- m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.GetSize()));
+ m_TotalSize.fetch_sub(static_cast<uint64_t>(ChunkLocation.Size));
+ }
+
+ if (BlocksToReWrite.contains(m_CurrentFileIndex))
+ {
+ m_CurrentFileIndex++;
+ std::filesystem::path path = m_RootDirectory / "ucas" / (std::to_string(m_CurrentFileIndex) + ".ucas");
+ auto SmallObjectFile = std::make_shared<BasicFile>();
+ SmallObjectFile->Open(path, true);
+ m_OpenBlocks[m_CurrentFileIndex] = SmallObjectFile;
+ m_CurrentBlock = SmallObjectFile;
+ m_CurrentInsertOffset = 0;
}
+ }
+ {
+ // Rewrite all BlocksToReWrite
+#if 0
// We can break here if we only want to remove items without compacting of space
std::vector<IoHash> MovedChunks;
@@ -511,15 +583,12 @@ ChunkBundler::CollectGarbage(GcContext& GcCtx)
{
m_CurrentInsertOffset = 0;
}
+#endif // 0
GcCtx.DeletedCas(DeletedChunks);
- uint64_t CurrentSize = m_SmallObjectFile.FileSize();
- ZEN_INFO("garbage collection complete '{}', space {} to {}, moved {} and delete {} chunks",
+ ZEN_INFO("garbage collection complete '{}', deleted {} chunks",
m_RootDirectory / m_ContainerBaseName,
- NiceBytes(CurrentSize),
- NiceBytes(m_CurrentInsertOffset),
- MovedChunks.size(),
DeletedChunks.size());
// TODO: Should we truncate the file or just keep the size of the file and reuse the space?
}
@@ -679,7 +748,7 @@ TEST_CASE("chunkbundler.reopen")
{
ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 16, true);
+ Store.Initialize("test", 65536, 16, true);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -712,7 +781,7 @@ TEST_CASE("chunkbundler.reopen")
{
ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 16, false);
+ Store.Initialize("test", 65536, 16, false);
for (int i = 0; i < kIterationCount; ++i)
{
@@ -757,7 +826,7 @@ TEST_CASE("chunkbundler.totalsize")
{
ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 16, true);
+ Store.Initialize("test", 65536, 16, true);
for (int32_t Idx = 0; Idx < kChunkCount; ++Idx)
{
@@ -773,7 +842,7 @@ TEST_CASE("chunkbundler.totalsize")
{
ChunkBundler Store(TempDir.Path(), nullptr);
- Store.Initialize("test", 16, false);
+ Store.Initialize("test", 65536, 16, false);
const uint64_t TotalSize = Store.StorageSize().DiskSize;
CHECK_EQ(kChunkSize * kChunkCount, TotalSize);
@@ -786,7 +855,7 @@ TEST_CASE("chunkbundler.gc.basic")
ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
- ChunkBundlerStore.Initialize("cb", 1 << 4, true);
+ ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true);
IoBuffer Chunk = CreateChunk(128);
IoHash ChunkHash = IoHash::HashBuffer(Chunk);
@@ -806,7 +875,7 @@ TEST_CASE("chunkbundler.gc.compact")
ScopedTemporaryDirectory TempDir;
ChunkBundler ChunkBundlerStore(TempDir.Path(), nullptr);
- ChunkBundlerStore.Initialize("cb", 1 << 4, true);
+ ChunkBundlerStore.Initialize("cb", 65536, 1 << 4, true);
uint64_t ChunkSizes[9] = {128, 541, 1023, 781, 218, 37, 4, 997, 5};
IoBuffer Chunks[9] = {CreateChunk(ChunkSizes[0]),
diff --git a/zenstore/chunkbundler.h b/zenstore/chunkbundler.h
index 498320a6a..d84ee9627 100644
--- a/zenstore/chunkbundler.h
+++ b/zenstore/chunkbundler.h
@@ -29,34 +29,9 @@ public:
struct CompactDiskLocation
{
- CompactDiskLocation(uint64_t InOffset, uint64_t InSize)
- {
- ZEN_ASSERT(InOffset <= 0xff'ffff'ffff);
- ZEN_ASSERT(InSize <= 0xff'ffff'ffff);
-
- memcpy(&m_Offset[0], &InOffset, sizeof m_Offset);
- memcpy(&m_Size[0], &InSize, sizeof m_Size);
- }
-
- CompactDiskLocation() = default;
-
- inline uint64_t GetOffset() const
- {
- uint64_t Offset = 0;
- memcpy(&Offset, &m_Offset, sizeof m_Offset);
- return Offset;
- }
-
- inline uint64_t GetSize() const
- {
- uint64_t Size = 0;
- memcpy(&Size, &m_Size, sizeof m_Size);
- return Size;
- }
-
-private:
- uint8_t m_Offset[5];
- uint8_t m_Size[5];
+ uint16_t BlockIndex;
+ uint32_t Offset;
+ uint32_t Size;
};
struct CompactDiskIndexEntry
@@ -84,7 +59,7 @@ public:
bool New = false;
};
- void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore);
+ void Initialize(const std::string_view ContainerBaseName, uint64_t MaxBlockSize, uint64_t Alignment, bool IsNewStore);
InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
IoBuffer FindChunk(const IoHash& ChunkHash);
@@ -101,8 +76,8 @@ private:
ChunkBundlerValidator* m_Validator;
spdlog::logger& m_Log;
uint64_t m_PayloadAlignment = 1 << 4;
+ uint64_t m_MaxBlockSize = 1L << 30; // 1 Gb
bool m_IsInitialized = false;
- BasicFile m_SmallObjectFile;
BasicFile m_SmallObjectIndex;
std::filesystem::path RootDirectory;
TCasLogFile<CompactDiskIndexEntry> m_OpLog;
@@ -112,11 +87,16 @@ private:
RwLock m_LocationMapLock;
RwLock m_InsertLock; // used to serialize inserts
std::unordered_map<IoHash, CompactDiskLocation, IoHash::Hasher> m_LocationMap;
- std::atomic_uint64_t m_CurrentInsertOffset{};
+ std::atomic_uint32_t m_CurrentInsertOffset{};
std::atomic_uint64_t m_CurrentIndexOffset{};
std::atomic_uint64_t m_TotalSize{};
void MakeIndexSnapshot();
+
+ // Reserve one block of 1Gb
+ std::unordered_map<uint16_t, std::shared_ptr<BasicFile>> m_OpenBlocks;
+ std::weak_ptr<BasicFile> m_CurrentBlock;
+ uint16_t m_CurrentFileIndex = 0;
};
//////////////////////////////////////////////////////////////////////////