aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/chunkingcontroller.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/chunkingcontroller.cpp')
-rw-r--r--src/zenutil/chunkingcontroller.cpp275
1 files changed, 275 insertions, 0 deletions
diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp
new file mode 100644
index 000000000..2a7057a46
--- /dev/null
+++ b/src/zenutil/chunkingcontroller.cpp
@@ -0,0 +1,275 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/chunkingcontroller.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+using namespace std::literals;
+
+namespace {
+ std::vector<std::string> ReadStringArray(CbArrayView StringArray)
+ {
+ std::vector<std::string> Result;
+ Result.reserve(StringArray.Num());
+ for (CbFieldView FieldView : StringArray)
+ {
+ Result.emplace_back(FieldView.AsString());
+ }
+ return Result;
+ }
+
+ ChunkedParams ReadChunkParams(CbObjectView Params)
+ {
+ bool UseThreshold = Params["UseThreshold"sv].AsBool(true);
+ size_t MinSize = Params["MinSize"sv].AsUInt64(DefaultChunkedParams.MinSize);
+ size_t MaxSize = Params["MaxSize"sv].AsUInt64(DefaultChunkedParams.MaxSize);
+ size_t AvgSize = Params["AvgSize"sv].AsUInt64(DefaultChunkedParams.AvgSize);
+
+ return ChunkedParams{.UseThreshold = UseThreshold, .MinSize = MinSize, .MaxSize = MaxSize, .AvgSize = AvgSize};
+ }
+
+} // namespace
+
+class BasicChunkingController : public ChunkingController
+{
+public:
+ BasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
+ uint64_t ChunkFileSizeLimit,
+ const ChunkedParams& ChunkingParams)
+ : m_ChunkExcludeExtensions(ExcludeExtensions.begin(), ExcludeExtensions.end())
+ , m_ChunkFileSizeLimit(ChunkFileSizeLimit)
+ , m_ChunkingParams(ChunkingParams)
+ {
+ }
+
+ BasicChunkingController(CbObjectView Parameters)
+ : m_ChunkExcludeExtensions(ReadStringArray(Parameters["ChunkExcludeExtensions"sv].AsArrayView()))
+ , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit))
+ , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()))
+ {
+ }
+
+ virtual bool ProcessFile(const std::filesystem::path& InputPath,
+ uint64_t RawSize,
+ ChunkedInfoWithSource& OutChunked,
+ std::atomic<uint64_t>& BytesProcessed,
+ std::atomic<bool>& AbortFlag) const override
+ {
+ ZEN_TRACE_CPU("BasicChunkingController::ProcessFile");
+ const bool ExcludeFromChunking =
+ std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) !=
+ m_ChunkExcludeExtensions.end();
+
+ if (ExcludeFromChunking || (RawSize < m_ChunkFileSizeLimit))
+ {
+ return false;
+ }
+
+ BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
+ OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag);
+ return true;
+ }
+
+ virtual std::string_view GetName() const override { return Name; }
+
+ virtual CbObject GetParameters() const override
+ {
+ CbObjectWriter Writer;
+ Writer.BeginArray("ChunkExcludeExtensions"sv);
+ {
+ for (const std::string& Extension : m_ChunkExcludeExtensions)
+ {
+ Writer.AddString(Extension);
+ }
+ }
+ Writer.EndArray(); // ChunkExcludeExtensions
+ Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit);
+ Writer.BeginObject("ChunkingParams"sv);
+ {
+ Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold);
+
+ Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize);
+ Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize);
+ Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize);
+ }
+ Writer.EndObject(); // ChunkingParams
+ return Writer.Save();
+ }
+ static constexpr std::string_view Name = "BasicChunkingController"sv;
+
+protected:
+ const std::vector<std::string> m_ChunkExcludeExtensions;
+ const uint64_t m_ChunkFileSizeLimit;
+ const ChunkedParams m_ChunkingParams;
+};
+
+class ChunkingControllerWithFixedChunking : public ChunkingController
+{
+public:
+ ChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions,
+ uint64_t ChunkFileSizeLimit,
+ const ChunkedParams& ChunkingParams,
+ uint32_t FixedChunkingChunkSize)
+ : m_FixedChunkingExtensions(FixedChunkingExtensions.begin(), FixedChunkingExtensions.end())
+ , m_ChunkFileSizeLimit(ChunkFileSizeLimit)
+ , m_ChunkingParams(ChunkingParams)
+ , m_FixedChunkingChunkSize(FixedChunkingChunkSize)
+ {
+ }
+
+ ChunkingControllerWithFixedChunking(CbObjectView Parameters)
+ : m_FixedChunkingExtensions(ReadStringArray(Parameters["FixedChunkingExtensions"sv].AsArrayView()))
+ , m_ChunkFileSizeLimit(Parameters["ChunkFileSizeLimit"sv].AsUInt64(DefaultChunkingFileSizeLimit))
+ , m_ChunkingParams(ReadChunkParams(Parameters["ChunkingParams"sv].AsObjectView()))
+ , m_FixedChunkingChunkSize(Parameters["FixedChunkingChunkSize"sv].AsUInt32(16u * 1024u * 1024u))
+ {
+ }
+
+ virtual bool ProcessFile(const std::filesystem::path& InputPath,
+ uint64_t RawSize,
+ ChunkedInfoWithSource& OutChunked,
+ std::atomic<uint64_t>& BytesProcessed,
+ std::atomic<bool>& AbortFlag) const override
+ {
+ ZEN_TRACE_CPU("ChunkingControllerWithFixedChunking::ProcessFile");
+ if (RawSize < m_ChunkFileSizeLimit)
+ {
+ return false;
+ }
+ const bool FixedChunking = std::find(m_FixedChunkingExtensions.begin(), m_FixedChunkingExtensions.end(), InputPath.extension()) !=
+ m_FixedChunkingExtensions.end();
+
+ if (FixedChunking)
+ {
+ ZEN_TRACE_CPU("FixedChunking");
+ IoHashStream FullHash;
+ IoBuffer Source = IoBufferBuilder::MakeFromFile(InputPath);
+ uint64_t Offset = 0;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ ChunkHashToChunkIndex.reserve(1 + (RawSize / m_FixedChunkingChunkSize));
+ while (Offset < RawSize)
+ {
+ if (AbortFlag)
+ {
+ return false;
+ }
+ uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_FixedChunkingChunkSize);
+ IoBuffer Chunk(Source, Offset, ChunkSize);
+ MemoryView ChunkData = Chunk.GetView();
+ FullHash.Append(ChunkData);
+
+ IoHash ChunkHash = IoHash::HashBuffer(ChunkData);
+ if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end())
+ {
+ OutChunked.Info.ChunkSequence.push_back(It->second);
+ }
+ else
+ {
+ uint32_t ChunkIndex = gsl::narrow<uint32_t>(OutChunked.Info.ChunkHashes.size());
+ OutChunked.Info.ChunkHashes.push_back(ChunkHash);
+ OutChunked.Info.ChunkSequence.push_back(ChunkIndex);
+ OutChunked.ChunkSources.push_back({.Offset = Offset, .Size = gsl::narrow<uint32_t>(ChunkSize)});
+ }
+ Offset += ChunkSize;
+ BytesProcessed.fetch_add(ChunkSize);
+ }
+ OutChunked.Info.RawSize = RawSize;
+ OutChunked.Info.RawHash = FullHash.GetHash();
+ return true;
+ }
+ else
+ {
+ BasicFile Buffer(InputPath, BasicFile::Mode::kRead);
+ OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed);
+ return true;
+ }
+ }
+
+ virtual std::string_view GetName() const override { return Name; }
+
+ virtual CbObject GetParameters() const override
+ {
+ CbObjectWriter Writer;
+ Writer.BeginArray("FixedChunkingExtensions");
+ {
+ for (const std::string& Extension : m_FixedChunkingExtensions)
+ {
+ Writer.AddString(Extension);
+ }
+ }
+ Writer.EndArray(); // ChunkExcludeExtensions
+ Writer.AddInteger("ChunkFileSizeLimit"sv, m_ChunkFileSizeLimit);
+ Writer.BeginObject("ChunkingParams"sv);
+ {
+ Writer.AddBool("UseThreshold"sv, m_ChunkingParams.UseThreshold);
+
+ Writer.AddInteger("MinSize"sv, (uint64_t)m_ChunkingParams.MinSize);
+ Writer.AddInteger("MaxSize"sv, (uint64_t)m_ChunkingParams.MaxSize);
+ Writer.AddInteger("AvgSize"sv, (uint64_t)m_ChunkingParams.AvgSize);
+ }
+ Writer.EndObject(); // ChunkingParams
+ Writer.AddInteger("FixedChunkingChunkSize"sv, m_FixedChunkingChunkSize);
+ return Writer.Save();
+ }
+
+ static constexpr std::string_view Name = "ChunkingControllerWithFixedChunking"sv;
+
+protected:
+ const std::vector<std::string> m_FixedChunkingExtensions;
+ const uint64_t m_ChunkFileSizeLimit;
+ const ChunkedParams m_ChunkingParams;
+ const uint32_t m_FixedChunkingChunkSize;
+};
+
+std::unique_ptr<ChunkingController>
+CreateBasicChunkingController(std::span<const std::string_view> ExcludeExtensions,
+ uint64_t ChunkFileSizeLimit,
+ const ChunkedParams& ChunkingParams)
+{
+ return std::make_unique<BasicChunkingController>(ExcludeExtensions, ChunkFileSizeLimit, ChunkingParams);
+}
+std::unique_ptr<ChunkingController>
+CreateBasicChunkingController(CbObjectView Parameters)
+{
+ return std::make_unique<BasicChunkingController>(Parameters);
+}
+
+std::unique_ptr<ChunkingController>
+CreateChunkingControllerWithFixedChunking(std::span<const std::string_view> FixedChunkingExtensions,
+ uint64_t ChunkFileSizeLimit,
+ const ChunkedParams& ChunkingParams,
+ uint32_t FixedChunkingChunkSize)
+{
+ return std::make_unique<ChunkingControllerWithFixedChunking>(FixedChunkingExtensions,
+ ChunkFileSizeLimit,
+ ChunkingParams,
+ FixedChunkingChunkSize);
+}
+std::unique_ptr<ChunkingController>
+CreateChunkingControllerWithFixedChunking(CbObjectView Parameters)
+{
+ return std::make_unique<ChunkingControllerWithFixedChunking>(Parameters);
+}
+
+std::unique_ptr<ChunkingController>
+CreateChunkingController(std::string_view Name, CbObjectView Parameters)
+{
+ if (Name == BasicChunkingController::Name)
+ {
+ return CreateBasicChunkingController(Parameters);
+ }
+ else if (Name == ChunkingControllerWithFixedChunking::Name)
+ {
+ return CreateChunkingControllerWithFixedChunking(Parameters);
+ }
+ return {};
+}
+
+} // namespace zen