diff options
| author | Dan Engelbrecht <[email protected]> | 2023-08-21 13:07:22 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-08-21 13:07:22 +0200 |
| commit | ca5b35dcbde3e2da484572c821c4899763b0e0f0 (patch) | |
| tree | 2d6d7a15f6c7900cf7f3c0fe4510086e4fa572a4 /src | |
| parent | oplog mirror support (#367) (diff) | |
| download | zen-ca5b35dcbde3e2da484572c821c4899763b0e0f0.tar.xz zen-ca5b35dcbde3e2da484572c821c4899763b0e0f0.zip | |
buffered file reading for oplog (#366)
* add BasicFileBuffer for buffered read of BasicFile
* Use BasicFileBuffer when reading oplog
* changelog
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 21 | ||||
| -rw-r--r-- | src/zenutil/basicfile.cpp | 194 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/basicfile.h | 23 |
3 files changed, 233 insertions, 5 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index d20466161..421a6486f 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -272,7 +272,9 @@ struct ProjectStore::OplogStorage : public RefCounted uint64_t InvalidEntries = 0; - IoBuffer OpBuffer; + BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + IoBuffer OpBuffer(512); + m_Oplog.Replay( [&](const OplogEntry& LogEntry) { if (LogEntry.OpCoreSize == 0) @@ -288,8 +290,7 @@ struct ProjectStore::OplogStorage : public RefCounted } const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign; - - m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); + OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset); // Verify checksum, ignore op data if incorrect const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF); @@ -323,10 +324,20 @@ struct ProjectStore::OplogStorage : public RefCounted void ReplayLog(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler) { + ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog"); + + BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536); + IoBuffer OpBuffer(512); + for (const OplogEntryAddress& Entry : Entries) { - CbObject Op = GetOp(Entry); - Handler(Op); + const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; + if (OpBuffer.Size() < Entry.Size) + { + OpBuffer = IoBuffer(Entry.Size); + } + OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); + Handler(CbObject(SharedBuffer(OpBuffer))); } } diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp index f91a54222..12d9cf950 100644 --- a/src/zenutil/basicfile.cpp +++ b/src/zenutil/basicfile.cpp @@ -536,6 +536,64 @@ LockFile::Update(CbObject Payload, std::error_code& Ec) BasicFile::Write(Payload.GetBuffer(), 0, Ec); } +BasicFileBuffer::BasicFileBuffer(BasicFile& Base, uint64_t BufferSize) +: m_Base(Base) +, m_Buffer(nullptr) +, m_BufferSize(BufferSize) +, m_Size(Base.FileSize()) +, m_BufferStart(0) +, m_BufferEnd(0) +{ + m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize); +} + +BasicFileBuffer::~BasicFileBuffer() +{ + Memory::Free(m_Buffer); +} + +void +BasicFileBuffer::Read(void* Data, uint64_t Size, uint64_t FileOffset) +{ + if (m_Buffer == nullptr || (Size > m_BufferSize) || (FileOffset + Size > m_Size)) + { + m_Base.Read(Data, Size, FileOffset); + return; + } + uint8_t* WritePtr = ((uint8_t*)Data); + uint64_t Begin = FileOffset; + uint64_t End = FileOffset + Size; + if (FileOffset <= m_BufferStart) + { + if (End > m_BufferStart) + { + uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; + memcpy(WritePtr + End - Count - FileOffset, m_Buffer, Count); + End -= Count; + if (Begin == End) + { + return; + } + } + } + else if (FileOffset < m_BufferEnd) + { + uint64_t Count = Min(m_BufferEnd, End) - FileOffset; + memcpy(WritePtr + Begin - FileOffset, m_Buffer + Begin - m_BufferStart, Count); + Begin += Count; + if (Begin == End) + { + return; + } + } + m_BufferStart = Begin; + m_BufferEnd = Min(Begin + m_BufferSize, m_Size); + m_Base.Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); + uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; + memcpy(WritePtr + Begin - FileOffset, m_Buffer, Count); + ZEN_ASSERT(Begin + Count == End); +} + /* ___________ __ \__ ___/___ _______/ |_ ______ @@ -601,6 +659,142 @@ TEST_CASE("TemporaryFile") } } +TEST_CASE("BasicFileBuffer") +{ + ScopedCurrentDirectoryChange _; + { + BasicFile File1; + const std::string_view Data = "0123456789abcdef"; + File1.Open("buffered", BasicFile::Mode::kTruncate); + for (uint32_t I = 0; I < 16; ++I) + { + File1.Write(Data.data(), Data.size(), I * Data.size()); + } + } + SUBCASE("EvenBuffer") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + // Non-primed + { + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 1 * 16); + std::string_view Verify(Buffer, 16); + CHECK(Verify == "0123456789abcdef"); + } + // Primed + { + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 1 * 16); + std::string_view Verify(Buffer, 16); + CHECK(Verify == "0123456789abcdef"); + } + } + SUBCASE("UnevenBuffer") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + // Non-primed + { + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 7); + std::string_view Verify(Buffer, 16); + CHECK(Verify == "789abcdef0123456"); + } + // Primed + { + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 7); + std::string_view Verify(Buffer, 16); + CHECK(Verify == "789abcdef0123456"); + } + } + SUBCASE("BiggerThanBuffer") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + char Buffer[17] = {0}; + File1Buffer.Read(Buffer, 17, 0 * 16); + std::string_view Verify(Buffer, 17); + CHECK(Verify == "0123456789abcdef0"); + } + SUBCASE("InsideBuffer") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 0 * 16); + + File1Buffer.Read(Buffer, 8, 2); + std::string_view Verify(Buffer, 8); + CHECK(Verify == "23456789"); + } + SUBCASE("BeginningOfBuffer") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 8); + + File1Buffer.Read(Buffer, 8, 8); + std::string_view Verify(Buffer, 8); + CHECK(Verify == "89abcdef"); + } + SUBCASE("EndOfBuffer") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 0 * 16); + + File1Buffer.Read(Buffer, 8, 8); + std::string_view Verify(Buffer, 8); + CHECK(Verify == "89abcdef"); + } + SUBCASE("OverEnd") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 0 * 16); + + File1Buffer.Read(Buffer, 16, 8); + std::string_view Verify(Buffer, 16); + CHECK(Verify == "89abcdef01234567"); + } + SUBCASE("OverBegin") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 1 * 16); + + File1Buffer.Read(Buffer, 16, 8); + std::string_view Verify(Buffer, 16); + CHECK(Verify == "89abcdef01234567"); + } + SUBCASE("EndOfFile") + { + BasicFile File1; + File1.Open("buffered", BasicFile::Mode::kRead); + BasicFileBuffer File1Buffer(File1, 16); + char Buffer[16] = {0}; + File1Buffer.Read(Buffer, 16, 0 * 16); + + File1Buffer.Read(Buffer, 8, 256 - 8); + std::string_view Verify(Buffer, 8); + CHECK(Verify == "89abcdef"); + } +} + void basicfile_forcelink() { diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h index 1c5a31c5c..5a27befd9 100644 --- a/src/zenutil/include/zenutil/basicfile.h +++ b/src/zenutil/include/zenutil/basicfile.h @@ -120,6 +120,29 @@ public: private: }; +/** Adds a layer of buffered reading to a BasicFile + +This class is not intended for concurrent access, it is not thread safe. + +*/ + +class BasicFileBuffer +{ +public: + BasicFileBuffer(BasicFile& Base, uint64_t BufferSize); + ~BasicFileBuffer(); + + void Read(void* Data, uint64_t Size, uint64_t FileOffset); + +private: + BasicFile& m_Base; + uint8_t* m_Buffer; + const uint64_t m_BufferSize; + uint64_t m_Size; + uint64_t m_BufferStart; + uint64_t m_BufferEnd; +}; + ZENCORE_API void basicfile_forcelink(); } // namespace zen |