diff options
| author | Stefan Boberg <[email protected]> | 2023-10-23 15:24:20 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-23 15:24:20 +0200 |
| commit | fb413405e43d3a717b642360d73cfc61258e63b3 (patch) | |
| tree | 5106b0482253219e02fa25c87a1ac0208d9d8600 /src | |
| parent | fix m_LastFullGcDuration, m_LastFullGCDiff, m_LastFullGcDuration and m_LastLi... (diff) | |
| download | zen-fb413405e43d3a717b642360d73cfc61258e63b3.tar.xz zen-fb413405e43d3a717b642360d73cfc61258e63b3.zip | |
chunking moved to zenstore (#490)
* moved chunking into zenstore
* removed vestiges of experimental chunking command
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/chunk/chunk.cpp | 1216 | ||||
| -rw-r--r-- | src/zen/chunk/chunk.h | 25 | ||||
| -rw-r--r-- | src/zenstore/chunking.cpp | 382 | ||||
| -rw-r--r-- | src/zenstore/chunking.h | 56 |
4 files changed, 438 insertions, 1241 deletions
diff --git a/src/zen/chunk/chunk.cpp b/src/zen/chunk/chunk.cpp deleted file mode 100644 index 81dde4c6a..000000000 --- a/src/zen/chunk/chunk.cpp +++ /dev/null @@ -1,1216 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "chunk.h" - -#if 0 -# include <gsl/gsl-lite.hpp> - -# include <zencore/filesystem.h> -# include <zencore/iohash.h> -# include <zencore/logging.h> -# include <zencore/refcount.h> -# include <zencore/scopeguard.h> -# include <zencore/sha1.h> -# include <zencore/string.h> -# include <zencore/testing.h> -# include <zencore/thread.h> -# include <zencore/timer.h> -# include <zenstore/gc.h> - -# include "../internalfile.h" - -# include <lz4.h> -# include <zstd.h> - -# if ZEN_PLATFORM_WINDOWS -# include <ppl.h> -# include <ppltasks.h> -# endif // ZEN_PLATFORM_WINDOWS - -# include <cmath> -# include <filesystem> -# include <random> -# include <vector> - -////////////////////////////////////////////////////////////////////////// - -# if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - -namespace Concurrency { - -template<typename IterType, typename LambdaType> -void -parallel_for_each(IterType Cursor, IterType End, const LambdaType& Lambda) -{ - for (; Cursor < End; ++Cursor) - { - Lambda(*Cursor); - } -} - -template<typename T> -struct combinable -{ - T& local() { return Value; } - - template<typename LambdaType> - void combine_each(const LambdaType& Lambda) - { - Lambda(Value); - } - - T Value = {}; -}; - -struct task_group -{ - template<class Function> - void run(const Function& Func) - { - Func(); - } - - void wait() {} -}; - -} // namespace Concurrency - -# endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - -////////////////////////////////////////////////////////////////////////// - -namespace detail { -static const uint32_t buzhashTable[] = { - 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801, 0x7ebf5191, 0x841135c7, 0x65cc53b3, - 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494, 0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, 0x882bf287, 0x3116737c, - 0x05569956, 0xe8cc1f68, 0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93, 0x9bfd7c64, - 0x0b3e7276, 0xf2688e77, 0x8fad8abc, 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00, 0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, - 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2, 0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, 0x7b7c222f, 0x2955ed16, 0x9f10ca59, - 0xe840c4c9, 0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5, 0xb19165cd, 0x9891c393, - 0x325384ac, 0x0308459d, 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed, 0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, 0x977eb18c, - 0xd8770976, 0x9833466a, 0xc674df7f, 0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4, - 0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85, 0xbef8f0e1, 0x21d73653, 0x4e3d977a, - 0x1e7b3929, 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6, 0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, 0xf5f7be70, 0xe795248a, - 0x375a2fe9, 0x425570b6, 0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f, 0x1bc0dfb5, - 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2, 0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, - 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b, 0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, - 0xe0d8f8ae, 0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f, 0x686a5b83, 0x50e072e5, - 0xd9d3bb2a, 0x8befc475, 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18, 0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, 0xc0d0a81c, - 0x7fa3429b, 0xe9158a1b, 0x225ea19a, 0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140, - 0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9, 0x19727a23, 0x15a7e374, 0xc43a18d5, - 0x3fb1aa73, 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9, 0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, 0x5388e5ee, 0xcd8a7510, - 0xf901b4fd, 0xdbc13dbc, 0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b, 0x32baf4a9, - 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c, 0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, - 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319, 0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, - 0xc6eb57bb, 0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b, 0x329e5388, 0x91dd236b, - 0x2ecb0d93, 0xf4d82a3d, 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964, 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, - 0xf9c18d66, 0x593ade65, 0xd95ddf11, -}; - -// ROL operation (compiler turns this into a ROL when optimizing) -static inline uint32_t -Rotate32(uint32_t Value, size_t RotateCount) -{ - RotateCount &= 31; - - return ((Value) << (RotateCount)) | ((Value) >> (32 - RotateCount)); -} -} // namespace detail - -////////////////////////////////////////////////////////////////////////// - -class ZenChunker -{ -public: - void SetChunkSize(size_t MinSize, size_t MaxSize, size_t AvgSize); - size_t ScanChunk(const void* DataBytes, size_t ByteCount); - void Reset(); - - // This controls which chunking approach is used - threshold or - // modulo based. Threshold is faster and generates similarly sized - // chunks - void SetUseThreshold(bool NewState) { m_useThreshold = NewState; } - - inline size_t ChunkSizeMin() const { return m_chunkSizeMin; } - inline size_t ChunkSizeMax() const { return m_chunkSizeMax; } - inline size_t ChunkSizeAvg() const { return m_chunkSizeAvg; } - inline uint64_t BytesScanned() const { return m_bytesScanned; } - - static constexpr size_t NoBoundaryFound = size_t(~0ull); - -private: - size_t m_chunkSizeMin = 0; - size_t m_chunkSizeMax = 0; - size_t m_chunkSizeAvg = 0; - - uint32_t m_discriminator = 0; // Computed in SetChunkSize() - uint32_t m_threshold = 0; // Computed in SetChunkSize() - - bool m_useThreshold = true; - - static constexpr size_t kChunkSizeLimitMax = 64 * 1024 * 1024; - static constexpr size_t kChunkSizeLimitMin = 1024; - - static constexpr size_t kDefaultAverageChunkSize = 64 * 1024; - - static constexpr int kWindowSize = 48; - uint8_t m_window[kWindowSize]; - uint32_t m_windowSize = 0; - - uint32_t m_currentHash = 0; - uint32_t m_currentChunkSize = 0; - - uint64_t m_bytesScanned = 0; - - size_t InternalScanChunk(const void* DataBytes, size_t ByteCount); - void InternalReset(); -}; - -void -ZenChunker::Reset() -{ - InternalReset(); - - m_bytesScanned = 0; -} - -void -ZenChunker::InternalReset() -{ - m_currentHash = 0; - m_currentChunkSize = 0; - m_windowSize = 0; -} - -void -ZenChunker::SetChunkSize(size_t MinSize, size_t MaxSize, size_t AvgSize) -{ - if (m_windowSize) - return; // Already started - - static_assert(kChunkSizeLimitMin > kWindowSize); - - if (AvgSize) - { - // TODO: Validate AvgSize range - } - else - { - if (MinSize && MaxSize) - { - AvgSize = lrint(pow(2, (log2(MinSize) + log2(MaxSize)) / 2)); - } - else if (MinSize) - { - AvgSize = MinSize * 4; - } - else if (MaxSize) - { - AvgSize = MaxSize / 4; - } - else - { - AvgSize = kDefaultAverageChunkSize; - } - } - - if (MinSize) - { - // TODO: Validate MinSize range - } - else - { - MinSize = std::max(AvgSize / 4, kChunkSizeLimitMin); - } - - if (MaxSize) - { - // TODO: Validate MaxSize range - } - else - { - MaxSize = std::min(AvgSize * 4, kChunkSizeLimitMax); - } - - m_discriminator = gsl::narrow<uint32_t>(AvgSize - MinSize); - - if (m_discriminator < MinSize) - { - m_discriminator = gsl::narrow<uint32_t>(MinSize); - } - - if (m_discriminator > MaxSize) - { - m_discriminator = gsl::narrow<uint32_t>(MaxSize); - } - - m_threshold = gsl::narrow<uint32_t>((uint64_t(std::numeric_limits<uint32_t>::max()) + 1) / m_discriminator); - - m_chunkSizeMin = MinSize; - m_chunkSizeMax = MaxSize; - m_chunkSizeAvg = AvgSize; -} - -size_t -ZenChunker::ScanChunk(const void* DataBytesIn, size_t ByteCount) -{ - size_t Result = InternalScanChunk(DataBytesIn, ByteCount); - - if (Result == NoBoundaryFound) - { - m_bytesScanned += ByteCount; - } - else - { - m_bytesScanned += Result; - } - - return Result; -} - -size_t -ZenChunker::InternalScanChunk(const void* DataBytesIn, size_t ByteCount) -{ - size_t CurrentOffset = 0; - const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(DataBytesIn); - - // There's no point in updating the hash if we know we're not - // going to have a cut point, so just skip the data. This logic currently - // provides roughly a 20% speedup on my machine - - const size_t NeedHashOffset = m_chunkSizeMin - kWindowSize; - - if (m_currentChunkSize < NeedHashOffset) - { - const uint32_t SkipBytes = gsl::narrow<uint32_t>(std::min<uint64_t>(ByteCount, NeedHashOffset - m_currentChunkSize)); - - ByteCount -= SkipBytes; - m_currentChunkSize += SkipBytes; - CurrentOffset += SkipBytes; - CursorPtr += SkipBytes; - - m_windowSize = 0; - - if (ByteCount == 0) - { - return NoBoundaryFound; - } - } - - // Fill window first - - if (m_windowSize < kWindowSize) - { - const uint32_t FillBytes = uint32_t(std::min<size_t>(ByteCount, kWindowSize - m_windowSize)); - - memcpy(&m_window[m_windowSize], CursorPtr, FillBytes); - - CursorPtr += FillBytes; - - m_windowSize += FillBytes; - m_currentChunkSize += FillBytes; - - CurrentOffset += FillBytes; - ByteCount -= FillBytes; - - if (m_windowSize < kWindowSize) - { - return NoBoundaryFound; - } - - // We have a full window, initialize hash - - uint32_t CurrentHash = 0; - - for (int i = 1; i < kWindowSize; ++i) - { - CurrentHash ^= detail::Rotate32(detail::buzhashTable[m_window[i - 1]], kWindowSize - i); - } - - m_currentHash = CurrentHash ^ detail::buzhashTable[m_window[kWindowSize - 1]]; - } - - // Scan for boundaries (i.e points where the hash matches the value determined by - // the discriminator) - - uint32_t CurrentHash = m_currentHash; - uint32_t CurrentChunkSize = m_currentChunkSize; - - size_t Index = CurrentChunkSize % kWindowSize; - - if (m_threshold && m_useThreshold) - { - // This is roughly 4x faster than the general modulo approach on my - // TR 3990X (~940MB/sec) and doesn't require any special parameters to - // achieve max performance - - while (ByteCount) - { - const uint8_t NewByte = *CursorPtr; - const uint8_t OldByte = m_window[Index]; - - CurrentHash = detail::Rotate32(CurrentHash, 1) ^ detail::Rotate32(detail::buzhashTable[OldByte], m_windowSize) ^ - detail::buzhashTable[NewByte]; - - CurrentChunkSize++; - CurrentOffset++; - - if (CurrentChunkSize >= m_chunkSizeMin) - { - bool foundBreak; - - if (CurrentChunkSize >= m_chunkSizeMax) - { - foundBreak = true; - } - else - { - foundBreak = CurrentHash <= m_threshold; - } - - if (foundBreak) - { - // Boundary found! - InternalReset(); - - return CurrentOffset; - } - } - - m_window[Index++] = *CursorPtr; - - if (Index == kWindowSize) - { - Index = 0; - } - - ++CursorPtr; - --ByteCount; - } - } - else if ((m_discriminator & (m_discriminator - 1)) == 0) - { - // This is quite a bit faster than the generic modulo path, but - // requires a very specific average chunk size to be used. If you - // pass in an even power-of-two divided by 0.75 as the average - // chunk size you'll hit this path - - const uint32_t Mask = m_discriminator - 1; - - while (ByteCount) - { - const uint8_t NewByte = *CursorPtr; - const uint8_t OldByte = m_window[Index]; - - CurrentHash = detail::Rotate32(CurrentHash, 1) ^ detail::Rotate32(detail::buzhashTable[OldByte], m_windowSize) ^ - detail::buzhashTable[NewByte]; - - CurrentChunkSize++; - CurrentOffset++; - - if (CurrentChunkSize >= m_chunkSizeMin) - { - bool foundBreak; - - if (CurrentChunkSize >= m_chunkSizeMax) - { - foundBreak = true; - } - else - { - foundBreak = (CurrentHash & Mask) == Mask; - } - - if (foundBreak) - { - // Boundary found! - InternalReset(); - - return CurrentOffset; - } - } - - m_window[Index++] = *CursorPtr; - - if (Index == kWindowSize) - { - Index = 0; - } - - ++CursorPtr; - --ByteCount; - } - } - else - { - // This is the slowest path, which caps out around 250MB/sec for large sizes - // on my TR3900X - - while (ByteCount) - { - const uint8_t NewByte = *CursorPtr; - const uint8_t OldByte = m_window[Index]; - - CurrentHash = detail::Rotate32(CurrentHash, 1) ^ detail::Rotate32(detail::buzhashTable[OldByte], m_windowSize) ^ - detail::buzhashTable[NewByte]; - - CurrentChunkSize++; - CurrentOffset++; - - if (CurrentChunkSize >= m_chunkSizeMin) - { - bool foundBreak; - - if (CurrentChunkSize >= m_chunkSizeMax) - { - foundBreak = true; - } - else - { - foundBreak = (CurrentHash % m_discriminator) == (m_discriminator - 1); - } - - if (foundBreak) - { - // Boundary found! - InternalReset(); - - return CurrentOffset; - } - } - - m_window[Index++] = *CursorPtr; - - if (Index == kWindowSize) - { - Index = 0; - } - - ++CursorPtr; - --ByteCount; - } - } - - m_currentChunkSize = CurrentChunkSize; - m_currentHash = CurrentHash; - - return NoBoundaryFound; -} - -////////////////////////////////////////////////////////////////////////// - -class DirectoryScanner -{ -public: - struct FileEntry - { - std::filesystem::path Path; - uint64_t FileSize; - }; - - const std::vector<FileEntry>& Files() { return m_Files; } - std::vector<FileEntry>&& TakeFiles() { return std::move(m_Files); } - uint64_t FileBytes() const { return m_FileBytes; } - - void Scan(std::filesystem::path RootPath) - { - for (const std::filesystem::directory_entry& Entry : std::filesystem::recursive_directory_iterator(RootPath)) - { - if (Entry.is_regular_file()) - { - m_Files.push_back({Entry.path(), Entry.file_size()}); - m_FileBytes += Entry.file_size(); - } - } - } - -private: - std::vector<FileEntry> m_Files; - uint64_t m_FileBytes = 0; -}; - -////////////////////////////////////////////////////////////////////////// - -class BaseChunker -{ -public: - void SetCasStore(zen::CasStore* CasStore) { m_CasStore = CasStore; } - - struct StatsBlock - { - uint64_t TotalBytes = 0; - uint64_t TotalChunks = 0; - uint64_t TotalCompressed = 0; - uint64_t UniqueBytes = 0; - uint64_t UniqueChunks = 0; - uint64_t UniqueCompressed = 0; - uint64_t DuplicateBytes = 0; - uint64_t NewCasChunks = 0; - uint64_t NewCasBytes = 0; - - StatsBlock& operator+=(const StatsBlock& Rhs) - { - TotalBytes += Rhs.TotalBytes; - TotalChunks += Rhs.TotalChunks; - TotalCompressed += Rhs.TotalCompressed; - UniqueBytes += Rhs.UniqueBytes; - UniqueChunks += Rhs.UniqueChunks; - UniqueCompressed += Rhs.UniqueCompressed; - DuplicateBytes += Rhs.DuplicateBytes; - NewCasChunks += Rhs.NewCasChunks; - NewCasBytes += Rhs.NewCasBytes; - return *this; - } - }; - -protected: - Concurrency::combinable<StatsBlock> m_StatsBlock; - -public: - StatsBlock SumStats() - { - StatsBlock _; - m_StatsBlock.combine_each([&](const StatsBlock& Block) { _ += Block; }); - return _; - } - -protected: - struct HashSet - { - bool Add(const zen::IoHash& Hash) - { - const uint8_t ShardNo = Hash.Hash[19]; - - Bucket& Shard = m_Buckets[ShardNo]; - - zen::RwLock::ExclusiveLockScope _(Shard.HashLock); - - auto rv = Shard.Hashes.insert(Hash); - - return rv.second; - } - - private: - struct alignas(64) Bucket - { - zen::RwLock HashLock; - std::unordered_set<zen::IoHash, zen::IoHash::Hasher> Hashes; -# if ZEN_PLATFORM_WINDOWS -# pragma warning(suppress : 4324) // Padding due to alignment -# endif - }; - - Bucket m_Buckets[256]; - }; - - zen::CasStore* m_CasStore = nullptr; -}; - -class FixedBlockSizeChunker : public BaseChunker -{ -public: - FixedBlockSizeChunker(std::filesystem::path InRootPath) : m_RootPath(InRootPath) {} - ~FixedBlockSizeChunker() = default; - - void SetChunkSize(uint64_t ChunkSize) - { - /* TODO: verify validity of chunk size */ - m_ChunkSize = ChunkSize; - } - void SetUseCompression(bool UseCompression) { m_UseCompression = UseCompression; } - void SetPerformValidation(bool PerformValidation) { m_PerformValidation = PerformValidation; } - - void InitCompression() - { - if (!m_CompressionBufferManager) - { - std::call_once(m_CompressionInitFlag, [&] { - // Wasteful, but should only be temporary - m_CompressionBufferManager.reset(new FileBufferManager(m_ChunkSize * 2, 128)); - }); - } - } - - void ChunkFile(const DirectoryScanner::FileEntry& File) - { - InitCompression(); - - std::filesystem::path RelativePath{std::filesystem::relative(File.Path.generic_string(), m_RootPath)}; - - Concurrency::task_group ChunkProcessTasks; - - ZEN_INFO("Chunking {} ({})", RelativePath.generic_string(), zen::NiceBytes(File.FileSize)); - - zen::RefPtr<InternalFile> Zfile = new InternalFile; - Zfile->OpenRead(File.Path); - - size_t FileBytes = Zfile->GetFileSize(); - uint64_t CurrentFileOffset = 0; - - std::vector<zen::IoHash> BlockHashes{(FileBytes + m_ChunkSize - 1) / m_ChunkSize}; - - while (FileBytes) - { - zen::IoBuffer Buffer = m_BufferManager.AllocBuffer(); - - const size_t BytesToRead = std::min(FileBytes, Buffer.Size()); - - Zfile->Read((void*)Buffer.Data(), BytesToRead, CurrentFileOffset); - - auto ProcessChunk = [this, Buffer, &BlockHashes, CurrentFileOffset, BytesToRead] { - StatsBlock& Stats = m_StatsBlock.local(); - for (uint64_t Offset = 0; Offset < BytesToRead; Offset += m_ChunkSize) - { - const uint8_t* DataPointer = reinterpret_cast<const uint8_t*>(Buffer.Data()) + Offset; - const uint64_t DataSize = std::min(BytesToRead - Offset, m_ChunkSize); - const zen::IoHash Hash = zen::IoHash::HashBuffer(DataPointer, DataSize); - - BlockHashes[(CurrentFileOffset + Offset) / m_ChunkSize] = Hash; - - const bool IsNew = m_LocalHashSet.Add(Hash); - - if (IsNew) - { - if (m_UseCompression) - { - if (true) - { - // Compress using ZSTD - - // TODO: use CompressedBuffer format - - const size_t CompressBufferSize = ZSTD_compressBound(DataSize); - - zen::IoBuffer CompressedBuffer = m_CompressionBufferManager->AllocBuffer(); - char* CompressBuffer = (char*)CompressedBuffer.Data(); - - ZEN_ASSERT(CompressedBuffer.Size() >= CompressBufferSize); - - const size_t CompressedSize = ZSTD_compress(CompressBuffer, - CompressBufferSize, - (const char*)DataPointer, - DataSize, - ZSTD_CLEVEL_DEFAULT); - - Stats.UniqueCompressed += CompressedSize; - - if (m_CasStore) - { - const zen::IoHash CompressedHash = zen::IoHash::HashBuffer(CompressBuffer, CompressedSize); - zen::IoBuffer CompressedData = zen::IoBuffer(zen::IoBuffer::Wrap, CompressBuffer, CompressedSize); - zen::CasStore::InsertResult Result = m_CasStore->InsertChunk(CompressedData, CompressedHash); - - if (Result.New) - { - Stats.NewCasChunks += 1; - Stats.NewCasBytes += CompressedSize; - } - } - - m_CompressionBufferManager->ReturnBuffer(CompressedBuffer); - } - else - { - // Compress using LZ4 - const int CompressBufferSize = LZ4_compressBound(gsl::narrow<int>(DataSize)); - - zen::IoBuffer CompressedBuffer = m_CompressionBufferManager->AllocBuffer(); - char* CompressBuffer = (char*)CompressedBuffer.Data(); - - ZEN_ASSERT(CompressedBuffer.Size() >= size_t(CompressBufferSize)); - - const int CompressedSize = LZ4_compress_default((const char*)DataPointer, - CompressBuffer, - gsl::narrow<int>(DataSize), - CompressBufferSize); - - Stats.UniqueCompressed += CompressedSize; - - if (m_CasStore) - { - const zen::IoHash CompressedHash = zen::IoHash::HashBuffer(CompressBuffer, CompressedSize); - zen::IoBuffer CompressedData = zen::IoBuffer(zen::IoBuffer::Wrap, CompressBuffer, CompressedSize); - zen::CasStore::InsertResult Result = m_CasStore->InsertChunk(CompressedData, CompressedHash); - - if (Result.New) - { - Stats.NewCasChunks += 1; - Stats.NewCasBytes += CompressedSize; - } - } - - m_CompressionBufferManager->ReturnBuffer(CompressedBuffer); - } - } - else if (m_CasStore) - { - zen::CasStore::InsertResult Result = m_CasStore->InsertChunk(zen::IoBuffer(Buffer, Offset, DataSize), Hash); - - if (Result.New) - { - Stats.NewCasChunks += 1; - Stats.NewCasBytes += DataSize; - } - } - - Stats.UniqueBytes += DataSize; - Stats.UniqueChunks += 1; - } - else - { - // We've seen this chunk before - Stats.DuplicateBytes += DataSize; - } - - Stats.TotalBytes += DataSize; - Stats.TotalChunks += 1; - } - - m_BufferManager.ReturnBuffer(Buffer); - }; - - ChunkProcessTasks.run(ProcessChunk); - - CurrentFileOffset += BytesToRead; - FileBytes -= BytesToRead; - } - - ChunkProcessTasks.wait(); - - // Verify pass - - if (!m_UseCompression && m_PerformValidation) - { - const uint8_t* FileData = reinterpret_cast<const uint8_t*>(Zfile->MemoryMapFile()); - uint64_t Offset = 0; - const uint64_t BytesToRead = Zfile->GetFileSize(); - - for (zen::IoHash& Hash : BlockHashes) - { - const uint64_t DataSize = std::min(BytesToRead - Offset, m_ChunkSize); - const zen::IoHash CalcHash = zen::IoHash::HashBuffer(FileData + Offset, DataSize); - - ZEN_ASSERT(CalcHash == Hash); - - zen::IoBuffer FoundValue = m_CasStore->FindChunk(CalcHash); - - ZEN_ASSERT(FoundValue); - ZEN_ASSERT(FoundValue.Size() == DataSize); - - Offset += DataSize; - } - } - } - -private: - std::filesystem::path m_RootPath; - FileBufferManager m_BufferManager{128 * 1024, 128}; - uint64_t m_ChunkSize = 64 * 1024; - HashSet m_LocalHashSet; - bool m_UseCompression = true; - bool m_PerformValidation = false; - - std::once_flag m_CompressionInitFlag; - std::unique_ptr<FileBufferManager> m_CompressionBufferManager; -}; - -class VariableBlockSizeChunker : public BaseChunker -{ -public: - VariableBlockSizeChunker(std::filesystem::path InRootPath) : m_RootPath(InRootPath) {} - - void SetAverageChunkSize(uint64_t AverageChunkSize) { m_AverageChunkSize = AverageChunkSize; } - void SetUseCompression(bool UseCompression) { m_UseCompression = UseCompression; } - - void ChunkFile(const DirectoryScanner::FileEntry& File) - { - std::filesystem::path RelativePath{std::filesystem::relative(File.Path.generic_string(), m_RootPath)}; - - ZEN_INFO("Chunking {} ({})", RelativePath.generic_string(), zen::NiceBytes(File.FileSize)); - - zen::RefPtr<InternalFile> Zfile = new InternalFile; - Zfile->OpenRead(File.Path); - - // Could use IoBuffer here to help manage lifetimes of things - // across tasks / threads - - ZenChunker Chunker; - Chunker.SetChunkSize(0, 0, m_AverageChunkSize); - - const size_t DataSize = Zfile->GetFileSize(); - - std::vector<size_t> Boundaries; - - uint64_t CurrentStreamPosition = 0; - uint64_t CurrentChunkSize = 0; - size_t RemainBytes = DataSize; - - zen::IoHashStream IoHashStream; - - while (RemainBytes != 0) - { - zen::IoBuffer Buffer = m_BufferManager.AllocBuffer(); - - size_t BytesToRead = std::min(RemainBytes, Buffer.Size()); - - uint8_t* DataPointer = (uint8_t*)Buffer.Data(); - - Zfile->Read(DataPointer, BytesToRead, CurrentStreamPosition); - - StatsBlock& Stats = m_StatsBlock.local(); - - while (BytesToRead) - { - const size_t Boundary = Chunker.ScanChunk(DataPointer, BytesToRead); - - if (Boundary == ZenChunker::NoBoundaryFound) - { - IoHashStream.Append(DataPointer, BytesToRead); - CurrentStreamPosition += BytesToRead; - CurrentChunkSize += BytesToRead; - RemainBytes -= BytesToRead; - break; - } - - // Boundary found - - IoHashStream.Append(DataPointer, Boundary); - - const zen::IoHash Hash = IoHashStream.GetHash(); - const bool IsNew = m_LocalHashSet.Add(Hash); - - CurrentStreamPosition += Boundary; - CurrentChunkSize += Boundary; - Boundaries.push_back(CurrentStreamPosition); - - if (IsNew) - { - Stats.UniqueBytes += CurrentChunkSize; - } - else - { - // We've seen this chunk before - Stats.DuplicateBytes += CurrentChunkSize; - } - - DataPointer += Boundary; - RemainBytes -= Boundary; - BytesToRead -= Boundary; - CurrentChunkSize = 0; - IoHashStream.Reset(); - } - - m_BufferManager.ReturnBuffer(Buffer); - -# if 0 - Active.AddCount(); // needs fixing - - Concurrency::create_task([this, Zfile, CurrentPosition, DataPointer, &Active] { - const zen::IoHash Hash = zen::IoHash::HashBuffer(DataPointer, CurrentPosition); - - const bool isNew = m_LocalHashSet.Add(Hash); - - const int CompressBufferSize = LZ4_compressBound(gsl::narrow<int>(CurrentPosition)); - char* CompressBuffer = (char*)_aligned_malloc(CompressBufferSize, 16); - - const int CompressedSize = - LZ4_compress_default((const char*)DataPointer, CompressBuffer, gsl::narrow<int>(CurrentPosition), CompressBufferSize); - - m_TotalCompressed.local() += CompressedSize; - - if (isNew) - { - m_UniqueBytes.local() += CurrentPosition; - m_UniqueCompressed.local() += CompressedSize; - - if (m_CasStore) - { - const zen::IoHash CompressedHash = zen::IoHash::HashBuffer(CompressBuffer, CompressedSize); - m_CasStore->InsertChunk(CompressBuffer, CompressedSize, CompressedHash); - } - } - - Active.Signal(); // needs fixing - - _aligned_free(CompressBuffer); - }); -# endif - } - - StatsBlock& Stats = m_StatsBlock.local(); - Stats.TotalBytes += DataSize; - Stats.TotalChunks += Boundaries.size() + 1; - - // TODO: Wait for all compression tasks - - auto ChunkCount = Boundaries.size() + 1; - - ZEN_INFO("Split {} ({}) into {} chunks, avg size {}", - RelativePath.generic_string(), - zen::NiceBytes(File.FileSize), - ChunkCount, - File.FileSize / ChunkCount); - }; - -private: - HashSet m_LocalHashSet; - std::filesystem::path m_RootPath; - uint64_t m_AverageChunkSize = 32 * 1024; - bool m_UseCompression = true; - FileBufferManager m_BufferManager{128 * 1024, 128}; -}; - -////////////////////////////////////////////////////////////////////////// - -ChunkCommand::ChunkCommand() -{ - m_Options.add_options()("r,root", "Root directory for CAS pool", cxxopts::value(m_RootDirectory)); - m_Options.add_options()("d,dir", "Directory to scan", cxxopts::value(m_ScanDirectory)); - m_Options.add_options()("c,chunk-size", "Use fixed chunk size", cxxopts::value(m_ChunkSize)); - m_Options.add_options()("a,average-chunk-size", "Use dynamic chunk size", cxxopts::value(m_AverageChunkSize)); - m_Options.add_options()("compress", "Apply compression to chunks", cxxopts::value(m_UseCompression)); -} - -ChunkCommand::~ChunkCommand() = default; - -int -ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) -{ - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return 0; - } - - bool IsValid = m_ScanDirectory.length(); - - if (!IsValid) - throw zen::OptionParseException("Chunk command requires a directory to scan"); - - if ((m_ChunkSize && m_AverageChunkSize) && (!m_ChunkSize && !m_AverageChunkSize)) - throw zen::OptionParseException("Either of --chunk-size or --average-chunk-size must be used"); - - std::unique_ptr<zen::CasStore> CasStore; - - zen::GcManager Gc; - - if (!m_RootDirectory.empty()) - { - zen::CasStoreConfiguration Config; - Config.RootDirectory = m_RootDirectory; - - CasStore = zen::CreateCasStore(Gc); - CasStore->Initialize(Config); - } - - // Gather list of files to process - - ZEN_INFO("Gathering files from {}", m_ScanDirectory); - - std::filesystem::path RootPath{m_ScanDirectory}; - DirectoryScanner Scanner; - Scanner.Scan(RootPath); - - auto Files = Scanner.TakeFiles(); - uint64_t FileBytes = Scanner.FileBytes(); - - std::sort(begin(Files), end(Files), [](const DirectoryScanner::FileEntry& Lhs, const DirectoryScanner::FileEntry& Rhs) { - return Lhs.FileSize < Rhs.FileSize; - }); - - ZEN_INFO("Gathered {} files, total size {}", Files.size(), zen::NiceBytes(FileBytes)); - - auto ReportSummary = [&](BaseChunker& Chunker, uint64_t ElapsedMs) { - const BaseChunker::StatsBlock& Stats = Chunker.SumStats(); - - const size_t TotalChunkCount = Stats.TotalChunks; - ZEN_INFO("Scanned {} files in {}, generated {} chunks", Files.size(), zen::NiceTimeSpanMs(ElapsedMs), TotalChunkCount); - - const size_t TotalByteCount = Stats.TotalBytes; - const size_t TotalCompressedBytes = Stats.TotalCompressed; - - ZEN_INFO("Total bytes {} ({}), compresses into {}", - zen::NiceBytes(TotalByteCount), - zen::NiceByteRate(TotalByteCount, ElapsedMs), - zen::NiceBytes(TotalCompressedBytes)); - - const size_t TotalUniqueBytes = Stats.UniqueBytes; - const size_t TotalUniqueCompressedBytes = Stats.UniqueCompressed; - const size_t TotalDuplicateBytes = Stats.DuplicateBytes; - - ZEN_INFO("Chunksize average {}, unique bytes = {} (compressed {}), dup bytes = {}", - TotalByteCount / TotalChunkCount, - zen::NiceBytes(TotalUniqueBytes), - zen::NiceBytes(TotalUniqueCompressedBytes), - zen::NiceBytes(TotalDuplicateBytes)); - - ZEN_INFO("New to CAS: {} chunks, {}", Stats.NewCasChunks, zen::NiceBytes(Stats.NewCasBytes)); - }; - - // Process them as quickly as possible - - if (m_AverageChunkSize) - { - VariableBlockSizeChunker Chunker{RootPath}; - Chunker.SetAverageChunkSize(m_AverageChunkSize); - Chunker.SetUseCompression(m_UseCompression); - Chunker.SetCasStore(CasStore.get()); - - zen::Stopwatch timer; - -# if 1 - Concurrency::parallel_for_each(begin(Files), end(Files), [&Chunker](const auto& ThisFile) { Chunker.ChunkFile(ThisFile); }); -# else - for (const auto& ThisFile : Files) - { - Chunker.ChunkFile(ThisFile); - } -# endif - - uint64_t ElapsedMs = timer.GetElapsedTimeMs(); - - ReportSummary(Chunker, ElapsedMs); - } - else if (m_ChunkSize) - { - FixedBlockSizeChunker Chunker{RootPath}; - Chunker.SetChunkSize(m_ChunkSize); - Chunker.SetUseCompression(m_UseCompression); - Chunker.SetCasStore(CasStore.get()); - - zen::Stopwatch timer; - - Concurrency::parallel_for_each(begin(Files), end(Files), [&Chunker](const DirectoryScanner::FileEntry& ThisFile) { - try - { - Chunker.ChunkFile(ThisFile); - } - catch (std::exception& ex) - { - zen::ExtendableStringBuilder<256> Path8; - zen::PathToUtf8(ThisFile.Path, Path8); - ZEN_WARN("Caught exception while chunking '{}': {}", Path8, ex.what()); - } - }); - - uint64_t ElapsedMs = timer.GetElapsedTimeMs(); - - ReportSummary(Chunker, ElapsedMs); - } - else - { - ZEN_ASSERT(false); - } - - // TODO: implement snapshot enumeration and display - return 0; -} - -////////////////////////////////////////////////////////////////////////// - -# if ZEN_WITH_TESTS -TEST_CASE("chunking") -{ - using namespace zen; - - auto test = [](bool UseThreshold, bool Random, int MinBlockSize, int MaxBlockSize) { - std::mt19937_64 mt; - - std::vector<uint64_t> bytes; - bytes.resize(1 * 1024 * 1024); - - if (Random == false) - { - // Generate a single block of randomness - for (auto& w : bytes) - { - w = mt(); - } - } - - for (int i = MinBlockSize; i <= MaxBlockSize; i <<= 1) - { - Stopwatch timer; - - ZenChunker chunker; - chunker.SetUseThreshold(UseThreshold); - chunker.SetChunkSize(0, 0, i); - // chunker.SetChunkSize(i / 4, i * 4, 0); - // chunker.SetChunkSize(i / 8, i * 8, 0); - // chunker.SetChunkSize(i / 16, i * 16, 0); - // chunker.SetChunkSize(0, 0, size_t(i / 0.75)); // Hits the fast modulo path - - std::vector<size_t> boundaries; - - size_t CurrentPosition = 0; - int BoundaryCount = 0; - - do - { - if (Random == true) - { - // Generate a new block of randomness for each pass - for (auto& w : bytes) - { - w = mt(); - } - } - - const uint8_t* Ptr = reinterpret_cast<const uint8_t*>(bytes.data()); - size_t BytesRemain = bytes.size() * sizeof(uint64_t); - - for (;;) - { - const size_t Boundary = chunker.ScanChunk(Ptr, BytesRemain); - - if (Boundary == ZenChunker::NoBoundaryFound) - { - CurrentPosition += BytesRemain; - break; - } - - // Boundary found - - CurrentPosition += Boundary; - - CHECK(CurrentPosition >= chunker.ChunkSizeMin()); - CHECK(CurrentPosition <= chunker.ChunkSizeMax()); - - boundaries.push_back(CurrentPosition); - - CurrentPosition = 0; - Ptr += Boundary; - BytesRemain -= Boundary; - - ++BoundaryCount; - } - } while (BoundaryCount < 5000); - - size_t BoundarySum = 0; - - for (const auto& v : boundaries) - { - BoundarySum += v; - } - - double Avg = double(BoundarySum) / BoundaryCount; - const uint64_t ElapsedTimeMs = timer.GetElapsedTimeMs(); - - ZEN_INFO("{:9} : Avg {:9} - {:2.5} ({:6}, {})", - i, - Avg, - double(i / Avg), - NiceTimeSpanMs(ElapsedTimeMs), - NiceByteRate(chunker.BytesScanned(), ElapsedTimeMs)); - } - }; - - const bool Random = false; - - SUBCASE("threshold method") { test(/* UseThreshold */ true, /* Random */ Random, 2048, 1 * 1024 * 1024); } - - SUBCASE("mod method") { test(/* UseThreshold */ false, /* Random */ Random, 2048, 1 * 1024 * 1024); } -} -# endif -#endif diff --git a/src/zen/chunk/chunk.h b/src/zen/chunk/chunk.h deleted file mode 100644 index e796f4147..000000000 --- a/src/zen/chunk/chunk.h +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once -#include <zencore/zencore.h> -#include "../zen.h" - -#if 0 -class ChunkCommand : public ZenCmdBase -{ -public: - ChunkCommand(); - ~ChunkCommand(); - - virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; - virtual cxxopts::Options& Options() override { return m_Options; } - -private: - cxxopts::Options m_Options{"chunk", "Do a chunking pass"}; - std::string m_RootDirectory; - std::string m_ScanDirectory; - size_t m_ChunkSize = 0; - size_t m_AverageChunkSize = 0; - bool m_UseCompression = true; -}; -#endif // 0 diff --git a/src/zenstore/chunking.cpp b/src/zenstore/chunking.cpp new file mode 100644 index 000000000..80674de0a --- /dev/null +++ b/src/zenstore/chunking.cpp @@ -0,0 +1,382 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "chunking.h" + +#include <gsl/gsl-lite.hpp> + +#include <cmath> + +namespace zen::detail { + +static const uint32_t BuzhashTable[] = { + 0x458be752, 0xc10748cc, 0xfbbcdbb8, 0x6ded5b68, 0xb10a82b5, 0x20d75648, 0xdfc5665f, 0xa8428801, 0x7ebf5191, 0x841135c7, 0x65cc53b3, + 0x280a597c, 0x16f60255, 0xc78cbc3e, 0x294415f5, 0xb938d494, 0xec85c4e6, 0xb7d33edc, 0xe549b544, 0xfdeda5aa, 0x882bf287, 0x3116737c, + 0x05569956, 0xe8cc1f68, 0x0806ac5e, 0x22a14443, 0x15297e10, 0x50d090e7, 0x4ba60f6f, 0xefd9f1a7, 0x5c5c885c, 0x82482f93, 0x9bfd7c64, + 0x0b3e7276, 0xf2688e77, 0x8fad8abc, 0xb0509568, 0xf1ada29f, 0xa53efdfe, 0xcb2b1d00, 0xf2a9e986, 0x6463432b, 0x95094051, 0x5a223ad2, + 0x9be8401b, 0x61e579cb, 0x1a556a14, 0x5840fdc2, 0x9261ddf6, 0xcde002bb, 0x52432bb0, 0xbf17373e, 0x7b7c222f, 0x2955ed16, 0x9f10ca59, + 0xe840c4c9, 0xccabd806, 0x14543f34, 0x1462417a, 0x0d4a1f9c, 0x087ed925, 0xd7f8f24c, 0x7338c425, 0xcf86c8f5, 0xb19165cd, 0x9891c393, + 0x325384ac, 0x0308459d, 0x86141d7e, 0xc922116a, 0xe2ffa6b6, 0x53f52aed, 0x2cd86197, 0xf5b9f498, 0xbf319c8f, 0xe0411fae, 0x977eb18c, + 0xd8770976, 0x9833466a, 0xc674df7f, 0x8c297d45, 0x8ca48d26, 0xc49ed8e2, 0x7344f874, 0x556f79c7, 0x6b25eaed, 0xa03e2b42, 0xf68f66a4, + 0x8e8b09a2, 0xf2e0e62a, 0x0d3a9806, 0x9729e493, 0x8c72b0fc, 0x160b94f6, 0x450e4d3d, 0x7a320e85, 0xbef8f0e1, 0x21d73653, 0x4e3d977a, + 0x1e7b3929, 0x1cc6c719, 0xbe478d53, 0x8d752809, 0xe6d8c2c6, 0x275f0892, 0xc8acc273, 0x4cc21580, 0xecc4a617, 0xf5f7be70, 0xe795248a, + 0x375a2fe9, 0x425570b6, 0x8898dcf8, 0xdc2d97c4, 0x0106114b, 0x364dc22f, 0x1e0cad1f, 0xbe63803c, 0x5f69fac2, 0x4d5afa6f, 0x1bc0dfb5, + 0xfb273589, 0x0ea47f7b, 0x3c1c2b50, 0x21b2a932, 0x6b1223fd, 0x2fe706a8, 0xf9bd6ce2, 0xa268e64e, 0xe987f486, 0x3eacf563, 0x1ca2018c, + 0x65e18228, 0x2207360a, 0x57cf1715, 0x34c37d2b, 0x1f8f3cde, 0x93b657cf, 0x31a019fd, 0xe69eb729, 0x8bca7b9b, 0x4c9d5bed, 0x277ebeaf, + 0xe0d8f8ae, 0xd150821c, 0x31381871, 0xafc3f1b0, 0x927db328, 0xe95effac, 0x305a47bd, 0x426ba35b, 0x1233af3f, 0x686a5b83, 0x50e072e5, + 0xd9d3bb2a, 0x8befc475, 0x487f0de6, 0xc88dff89, 0xbd664d5e, 0x971b5d18, 0x63b14847, 0xd7d3c1ce, 0x7f583cf3, 0x72cbcb09, 0xc0d0a81c, + 0x7fa3429b, 0xe9158a1b, 0x225ea19a, 0xd8ca9ea3, 0xc763b282, 0xbb0c6341, 0x020b8293, 0xd4cd299d, 0x58cfa7f8, 0x91b4ee53, 0x37e4d140, + 0x95ec764c, 0x30f76b06, 0x5ee68d24, 0x679c8661, 0xa41979c2, 0xf2b61284, 0x4fac1475, 0x0adb49f9, 0x19727a23, 0x15a7e374, 0xc43a18d5, + 0x3fb1aa73, 0x342fc615, 0x924c0793, 0xbee2d7f0, 0x8a279de9, 0x4aa2d70c, 0xe24dd37f, 0xbe862c0b, 0x177c22c2, 0x5388e5ee, 0xcd8a7510, + 0xf901b4fd, 0xdbc13dbc, 0x6c0bae5b, 0x64efe8c7, 0x48b02079, 0x80331a49, 0xca3d8ae6, 0xf3546190, 0xfed7108b, 0xc49b941b, 0x32baf4a9, + 0xeb833a4a, 0x88a3f1a5, 0x3a91ce0a, 0x3cc27da1, 0x7112e684, 0x4a3096b1, 0x3794574c, 0xa3c8b6f3, 0x1d213941, 0x6e0a2e00, 0x233479f1, + 0x0f4cd82f, 0x6093edd2, 0x5d7d209e, 0x464fe319, 0xd4dcac9e, 0x0db845cb, 0xfb5e4bc3, 0xe0256ce1, 0x09fb4ed1, 0x0914be1e, 0xa5bdb2c3, + 0xc6eb57bb, 0x30320350, 0x3f397e91, 0xa67791bc, 0x86bc0e2c, 0xefa0a7e2, 0xe9ff7543, 0xe733612c, 0xd185897b, 0x329e5388, 0x91dd236b, + 0x2ecb0d93, 0xf4d82a3d, 0x35b5c03f, 0xe4e606f0, 0x05b21843, 0x37b45964, 0x5eff22f4, 0x6027f4cc, 0x77178b3c, 0xae507131, 0x7bf7cabc, + 0xf9c18d66, 0x593ade65, 0xd95ddf11, +}; + +// ROL operation (compiler turns this into a ROL when optimizing) +static inline uint32_t +Rotate32(uint32_t Value, size_t RotateCount) +{ + RotateCount &= 31; + + return ((Value) << (RotateCount)) | ((Value) >> (32 - RotateCount)); +} + +} // namespace zen::detail + +namespace zen { + +void +ZenChunkHelper::Reset() +{ + InternalReset(); + + m_BytesScanned = 0; +} + +void +ZenChunkHelper::InternalReset() +{ + m_CurrentHash = 0; + m_CurrentChunkSize = 0; + m_WindowSize = 0; +} + +void +ZenChunkHelper::SetChunkSize(size_t MinSize, size_t MaxSize, size_t AvgSize) +{ + if (m_WindowSize) + return; // Already started + + static_assert(kChunkSizeLimitMin > kWindowSize); + + if (AvgSize) + { + // TODO: Validate AvgSize range + } + else + { + if (MinSize && MaxSize) + { + AvgSize = std::lrint(std::pow(2, (std::log2(MinSize) + std::log2(MaxSize)) / 2)); + } + else if (MinSize) + { + AvgSize = MinSize * 4; + } + else if (MaxSize) + { + AvgSize = MaxSize / 4; + } + else + { + AvgSize = kDefaultAverageChunkSize; + } + } + + if (MinSize) + { + // TODO: Validate MinSize range + } + else + { + MinSize = std::max(AvgSize / 4, kChunkSizeLimitMin); + } + + if (MaxSize) + { + // TODO: Validate MaxSize range + } + else + { + MaxSize = std::min(AvgSize * 4, kChunkSizeLimitMax); + } + + m_Discriminator = gsl::narrow<uint32_t>(AvgSize - MinSize); + + if (m_Discriminator < MinSize) + { + m_Discriminator = gsl::narrow<uint32_t>(MinSize); + } + + if (m_Discriminator > MaxSize) + { + m_Discriminator = gsl::narrow<uint32_t>(MaxSize); + } + + m_Threshold = gsl::narrow<uint32_t>((uint64_t(std::numeric_limits<uint32_t>::max()) + 1) / m_Discriminator); + + m_ChunkSizeMin = MinSize; + m_ChunkSizeMax = MaxSize; + m_ChunkSizeAvg = AvgSize; +} + +size_t +ZenChunkHelper::ScanChunk(const void* DataBytesIn, size_t ByteCount) +{ + size_t Result = InternalScanChunk(DataBytesIn, ByteCount); + + if (Result == kNoBoundaryFound) + { + m_BytesScanned += ByteCount; + } + else + { + m_BytesScanned += Result; + } + + return Result; +} + +size_t +ZenChunkHelper::InternalScanChunk(const void* DataBytesIn, size_t ByteCount) +{ + size_t CurrentOffset = 0; + const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(DataBytesIn); + + // There's no point in updating the hash if we know we're not + // going to have a cut point, so just skip the data. This logic currently + // provides roughly a 20% speedup on my machine + + const size_t NeedHashOffset = m_ChunkSizeMin - kWindowSize; + + if (m_CurrentChunkSize < NeedHashOffset) + { + const uint32_t SkipBytes = gsl::narrow<uint32_t>(std::min<uint64_t>(ByteCount, NeedHashOffset - m_CurrentChunkSize)); + + ByteCount -= SkipBytes; + m_CurrentChunkSize += SkipBytes; + CurrentOffset += SkipBytes; + CursorPtr += SkipBytes; + + m_WindowSize = 0; + + if (ByteCount == 0) + { + return kNoBoundaryFound; + } + } + + // Fill window first + + if (m_WindowSize < kWindowSize) + { + const uint32_t FillBytes = uint32_t(std::min<size_t>(ByteCount, kWindowSize - m_WindowSize)); + + memcpy(&m_Window[m_WindowSize], CursorPtr, FillBytes); + + CursorPtr += FillBytes; + + m_WindowSize += FillBytes; + m_CurrentChunkSize += FillBytes; + + CurrentOffset += FillBytes; + ByteCount -= FillBytes; + + if (m_WindowSize < kWindowSize) + { + return kNoBoundaryFound; + } + + // We have a full window, initialize hash + + uint32_t CurrentHash = 0; + + for (int i = 1; i < kWindowSize; ++i) + { + CurrentHash ^= detail::Rotate32(detail::BuzhashTable[m_Window[i - 1]], kWindowSize - i); + } + + m_CurrentHash = CurrentHash ^ detail::BuzhashTable[m_Window[kWindowSize - 1]]; + } + + // Scan for boundaries (i.e points where the hash matches the value determined by + // the discriminator) + + uint32_t CurrentHash = m_CurrentHash; + uint32_t CurrentChunkSize = m_CurrentChunkSize; + + size_t Index = CurrentChunkSize % kWindowSize; + + if (m_Threshold && m_UseThreshold) + { + // This is roughly 4x faster than the general modulo approach on my + // TR 3990X (~940MB/sec) and doesn't require any special parameters to + // achieve max performance + + while (ByteCount) + { + const uint8_t NewByte = *CursorPtr; + const uint8_t OldByte = m_Window[Index]; + + CurrentHash = detail::Rotate32(CurrentHash, 1) ^ detail::Rotate32(detail::BuzhashTable[OldByte], m_WindowSize) ^ + detail::BuzhashTable[NewByte]; + + CurrentChunkSize++; + CurrentOffset++; + + if (CurrentChunkSize >= m_ChunkSizeMin) + { + bool FoundBoundary; + + if (CurrentChunkSize >= m_ChunkSizeMax) + { + FoundBoundary = true; + } + else + { + FoundBoundary = CurrentHash <= m_Threshold; + } + + if (FoundBoundary) + { + // Boundary found! + InternalReset(); + + return CurrentOffset; + } + } + + m_Window[Index++] = *CursorPtr; + + if (Index == kWindowSize) + { + Index = 0; + } + + ++CursorPtr; + --ByteCount; + } + } + else if ((m_Discriminator & (m_Discriminator - 1)) == 0) + { + // This is quite a bit faster than the generic modulo path, but + // requires a very specific average chunk size to be used. If you + // pass in an even power-of-two divided by 0.75 as the average + // chunk size you'll hit this path + + const uint32_t Mask = m_Discriminator - 1; + + while (ByteCount) + { + const uint8_t NewByte = *CursorPtr; + const uint8_t OldByte = m_Window[Index]; + + CurrentHash = detail::Rotate32(CurrentHash, 1) ^ detail::Rotate32(detail::BuzhashTable[OldByte], m_WindowSize) ^ + detail::BuzhashTable[NewByte]; + + CurrentChunkSize++; + CurrentOffset++; + + if (CurrentChunkSize >= m_ChunkSizeMin) + { + bool FoundBoundary; + + if (CurrentChunkSize >= m_ChunkSizeMax) + { + FoundBoundary = true; + } + else + { + FoundBoundary = (CurrentHash & Mask) == Mask; + } + + if (FoundBoundary) + { + // Boundary found! + InternalReset(); + + return CurrentOffset; + } + } + + m_Window[Index++] = *CursorPtr; + + if (Index == kWindowSize) + { + Index = 0; + } + + ++CursorPtr; + --ByteCount; + } + } + else + { + // This is the slowest path, which caps out around 250MB/sec for large sizes + // on my TR3900X + + while (ByteCount) + { + const uint8_t NewByte = *CursorPtr; + const uint8_t OldByte = m_Window[Index]; + + CurrentHash = detail::Rotate32(CurrentHash, 1) ^ detail::Rotate32(detail::BuzhashTable[OldByte], m_WindowSize) ^ + detail::BuzhashTable[NewByte]; + + CurrentChunkSize++; + CurrentOffset++; + + if (CurrentChunkSize >= m_ChunkSizeMin) + { + bool FoundBoundary; + + if (CurrentChunkSize >= m_ChunkSizeMax) + { + FoundBoundary = true; + } + else + { + FoundBoundary = (CurrentHash % m_Discriminator) == (m_Discriminator - 1); + } + + if (FoundBoundary) + { + // Boundary found! + InternalReset(); + + return CurrentOffset; + } + } + + m_Window[Index++] = *CursorPtr; + + if (Index == kWindowSize) + { + Index = 0; + } + + ++CursorPtr; + --ByteCount; + } + } + + m_CurrentChunkSize = CurrentChunkSize; + m_CurrentHash = CurrentHash; + + return kNoBoundaryFound; +} + +} // namespace zen diff --git a/src/zenstore/chunking.h b/src/zenstore/chunking.h new file mode 100644 index 000000000..09c56454f --- /dev/null +++ b/src/zenstore/chunking.h @@ -0,0 +1,56 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once +#include <zencore/zencore.h> + +namespace zen { + +/** Content-defined chunking helper + */ +class ZenChunkHelper +{ +public: + void SetChunkSize(size_t MinSize, size_t MaxSize, size_t AvgSize); + size_t ScanChunk(const void* DataBytes, size_t ByteCount); + void Reset(); + + // This controls which chunking approach is used - threshold or + // modulo based. Threshold is faster and generates similarly sized + // chunks + void SetUseThreshold(bool NewState) { m_UseThreshold = NewState; } + + inline size_t ChunkSizeMin() const { return m_ChunkSizeMin; } + inline size_t ChunkSizeMax() const { return m_ChunkSizeMax; } + inline size_t ChunkSizeAvg() const { return m_ChunkSizeAvg; } + inline uint64_t BytesScanned() const { return m_BytesScanned; } + + static constexpr size_t kNoBoundaryFound = size_t(~0ull); + +private: + size_t m_ChunkSizeMin = 0; + size_t m_ChunkSizeMax = 0; + size_t m_ChunkSizeAvg = 0; + + uint32_t m_Discriminator = 0; // Computed in SetChunkSize() + uint32_t m_Threshold = 0; // Computed in SetChunkSize() + + bool m_UseThreshold = true; + + static constexpr size_t kChunkSizeLimitMax = 64 * 1024 * 1024; + static constexpr size_t kChunkSizeLimitMin = 1024; + static constexpr size_t kDefaultAverageChunkSize = 64 * 1024; + + static constexpr int kWindowSize = 48; + uint8_t m_Window[kWindowSize]; + uint32_t m_WindowSize = 0; + + uint32_t m_CurrentHash = 0; + uint32_t m_CurrentChunkSize = 0; + + uint64_t m_BytesScanned = 0; + + size_t InternalScanChunk(const void* DataBytes, size_t ByteCount); + void InternalReset(); +}; + +} // namespace zen |