aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-02 16:41:32 +0200
committerDan Engelbrecht <[email protected]>2022-05-02 16:41:32 +0200
commitbb7593c9ea3412a48b3d29f3e7f7b23d7a785b2f (patch)
tree133f4ada4cce17ca808ec65249aab7f2d2a83ae1
parentMerge branch 'de/block-store-refactor' of github.com:EpicGames/zen into de/bl... (diff)
downloadzen-bb7593c9ea3412a48b3d29f3e7f7b23d7a785b2f.tar.xz
zen-bb7593c9ea3412a48b3d29f3e7f7b23d7a785b2f.zip
Refactor WriteChunk to not need callback
-rw-r--r--zenserver/cache/structuredcachestore.cpp46
-rw-r--r--zenstore/blockstore.cpp19
-rw-r--r--zenstore/compactcas.cpp19
-rw-r--r--zenstore/include/zenstore/blockstore.h20
4 files changed, 42 insertions, 62 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 015912ce9..6f6f182b9 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -860,7 +860,7 @@ ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
{
BasicFile BlockFile;
BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- BlockFileSize = BlockFile.FileSize();strcut
+ BlockFileSize = BlockFile.FileSize();
}
std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
@@ -1845,31 +1845,25 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
EntryFlags |= DiskLocation::kCompressed;
}
- uint64_t ChunkSize = Value.Value.Size();
-
- m_BlockStore.WriteChunk(Value.Value.Data(),
- ChunkSize,
- m_PayloadAlignment,
- [this, &HashKey, EntryFlags](const BlockStoreLocation& BlockStoreLocation) {
- DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
- const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
- m_SlogFile.Append(DiskIndexEntry);
- m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order_seq_cst);
- RwLock::ExclusiveLockScope __(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
- {
- // TODO: should check if write is idempotent and bail out if it is?
- // this would requiring comparing contents on disk unless we add a
- // content hash to the index entry
- IndexEntry& Entry = It.value();
- Entry.Location = Location;
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
- }
- else
- {
- m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
- }
- });
+ BlockStoreLocation BlockStoreLocation = m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment);
+ DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
+ const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
+ m_SlogFile.Append(DiskIndexEntry);
+ m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order_seq_cst);
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ // this would requiring comparing contents on disk unless we add a
+ // content hash to the index entry
+ IndexEntry& Entry = It.value();
+ Entry.Location = Location;
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ }
+ else
+ {
+ m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
+ }
}
//////////////////////////////////////////////////////////////////////////
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index d293eb97d..178687b5f 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -193,8 +193,8 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath,
}
}
-void
-BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, WriteChunkCallback Callback)
+BlockStoreLocation
+BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment)
{
RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
@@ -227,15 +227,11 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, Writ
uint64_t InsertOffset = m_CurrentInsertOffset;
m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
Ref<BlockStoreFile> WriteBlock = m_WriteBlock;
- m_ActiveWriteBlockIndexes.push_back(WriteBlockIndex);
InsertLock.ReleaseNow();
WriteBlock->Write(Data, Size, InsertOffset);
- Callback({.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size});
-
- RwLock::ExclusiveLockScope _(m_InsertLock);
- m_ActiveWriteBlockIndexes.erase(std::find(m_ActiveWriteBlockIndexes.begin(), m_ActiveWriteBlockIndexes.end(), WriteBlockIndex));
+ return {.BlockIndex = WriteBlockIndex, .Offset = InsertOffset, .Size = Size};
}
BlockStore::ReclaimSnapshotState
@@ -243,11 +239,8 @@ BlockStore::GetReclaimSnapshotState()
{
ReclaimSnapshotState State;
RwLock::ExclusiveLockScope _(m_InsertLock);
- for (uint32_t BlockIndex : m_ActiveWriteBlockIndexes)
- {
- State.ExcludeBlockIndexes.insert(BlockIndex);
- }
- State.BlockCount = m_ChunkBlocks.size();
+ State.ExcludeBlockIndex = m_WriteBlock ? m_WriteBlockIndex.load(std::memory_order_acquire) : 0xffffffffu;
+ State.BlockCount = m_ChunkBlocks.size();
_.ReleaseNow();
return State;
}
@@ -351,7 +344,7 @@ BlockStore::ReclaimSpace(const ReclaimSnapshotState& Snapshot,
{
const BlockStoreLocation& Location = ChunkLocations[Index];
OldTotalSize += Location.Size;
- if (Snapshot.ExcludeBlockIndexes.contains(Location.BlockIndex))
+ if (Location.BlockIndex == Snapshot.ExcludeBlockIndex)
{
continue;
}
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index 5a0ba974b..c6115024b 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -250,16 +250,15 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
// This should be a rare occasion and the current flow reduces the time we block for
// reads, insert and GC.
- m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [this, &ChunkHash, ChunkSize](const BlockStoreLocation& Location) {
- BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
- const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
- m_CasLog.Append(IndexEntry);
- {
- RwLock::ExclusiveLockScope _(m_LocationMapLock);
- m_LocationMap.emplace(ChunkHash, DiskLocation);
- }
- m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
- });
+ BlockStoreLocation Location = m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment);
+ BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment);
+ const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation};
+ m_CasLog.Append(IndexEntry);
+ {
+ RwLock::ExclusiveLockScope _(m_LocationMapLock);
+ m_LocationMap.emplace(ChunkHash, DiskLocation);
+ }
+ m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order_seq_cst);
return CasStore::InsertResult{.New = true};
}
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index 4a1642413..5019e257d 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -114,8 +114,8 @@ class BlockStore
public:
struct ReclaimSnapshotState
{
- std::unordered_set<uint32_t> ExcludeBlockIndexes;
- size_t BlockCount;
+ size_t ExcludeBlockIndex;
+ size_t BlockCount;
};
typedef std::vector<std::pair<size_t, BlockStoreLocation>> MovedChunksArray;
@@ -123,7 +123,6 @@ public:
typedef std::function<void(const MovedChunksArray& MovedChunks, const ChunkIndexArray& RemovedChunks)> ReclaimCallback;
typedef std::function<uint64_t()> ClaimDiskReserveCallback;
- typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
typedef std::function<void(size_t ChunkIndex, BasicFile& BlockFile, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
typedef std::function<void(const MovedChunksArray& MovedChunks)> SplitCallback;
@@ -133,11 +132,7 @@ public:
uint64_t MaxBlockCount,
const std::vector<BlockStoreLocation>& KnownLocations);
- void WriteChunk(
- const void* Data,
- uint64_t Size,
- uint64_t Alignment,
- WriteChunkCallback Callback = [](const BlockStoreLocation&) {});
+ BlockStoreLocation WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment);
Ref<BlockStoreFile> GetChunkBlock(const BlockStoreLocation& Location);
void Flush();
@@ -171,11 +166,10 @@ public:
private:
std::unordered_map<uint32_t, Ref<BlockStoreFile>> m_ChunkBlocks;
- RwLock m_InsertLock; // used to serialize inserts
- Ref<BlockStoreFile> m_WriteBlock;
- std::uint64_t m_CurrentInsertOffset = 0;
- std::atomic_uint32_t m_WriteBlockIndex{};
- std::vector<uint32_t> m_ActiveWriteBlockIndexes;
+ RwLock m_InsertLock; // used to serialize inserts
+ Ref<BlockStoreFile> m_WriteBlock;
+ std::uint64_t m_CurrentInsertOffset = 0;
+ std::atomic_uint32_t m_WriteBlockIndex{};
uint64_t m_MaxBlockSize = 1u << 28;
uint64_t m_MaxBlockCount = BlockStoreDiskLocation::MaxBlockIndex + 1;