aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-08-21 13:07:22 +0200
committerGitHub <[email protected]>2023-08-21 13:07:22 +0200
commitca5b35dcbde3e2da484572c821c4899763b0e0f0 (patch)
tree2d6d7a15f6c7900cf7f3c0fe4510086e4fa572a4 /src
parentoplog mirror support (#367) (diff)
downloadzen-ca5b35dcbde3e2da484572c821c4899763b0e0f0.tar.xz
zen-ca5b35dcbde3e2da484572c821c4899763b0e0f0.zip
buffered file reading for oplog (#366)
* add BasicFileBuffer for buffered read of BasicFile * Use BasicFileBuffer when reading oplog * changelog
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp21
-rw-r--r--src/zenutil/basicfile.cpp194
-rw-r--r--src/zenutil/include/zenutil/basicfile.h23
3 files changed, 233 insertions, 5 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index d20466161..421a6486f 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -272,7 +272,9 @@ struct ProjectStore::OplogStorage : public RefCounted
uint64_t InvalidEntries = 0;
- IoBuffer OpBuffer;
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ IoBuffer OpBuffer(512);
+
m_Oplog.Replay(
[&](const OplogEntry& LogEntry) {
if (LogEntry.OpCoreSize == 0)
@@ -288,8 +290,7 @@ struct ProjectStore::OplogStorage : public RefCounted
}
const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
-
- m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
// Verify checksum, ignore op data if incorrect
const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF);
@@ -323,10 +324,20 @@ struct ProjectStore::OplogStorage : public RefCounted
void ReplayLog(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler)
{
+ ZEN_TRACE_CPU("Store::OplogStorage::ReplayLog");
+
+ BasicFileBuffer OpBlobsBuffer(m_OpBlobs, 65536);
+ IoBuffer OpBuffer(512);
+
for (const OplogEntryAddress& Entry : Entries)
{
- CbObject Op = GetOp(Entry);
- Handler(Op);
+ const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
+ if (OpBuffer.Size() < Entry.Size)
+ {
+ OpBuffer = IoBuffer(Entry.Size);
+ }
+ OpBlobsBuffer.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
+ Handler(CbObject(SharedBuffer(OpBuffer)));
}
}
diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp
index f91a54222..12d9cf950 100644
--- a/src/zenutil/basicfile.cpp
+++ b/src/zenutil/basicfile.cpp
@@ -536,6 +536,64 @@ LockFile::Update(CbObject Payload, std::error_code& Ec)
BasicFile::Write(Payload.GetBuffer(), 0, Ec);
}
+BasicFileBuffer::BasicFileBuffer(BasicFile& Base, uint64_t BufferSize)
+: m_Base(Base)
+, m_Buffer(nullptr)
+, m_BufferSize(BufferSize)
+, m_Size(Base.FileSize())
+, m_BufferStart(0)
+, m_BufferEnd(0)
+{
+ m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize);
+}
+
+BasicFileBuffer::~BasicFileBuffer()
+{
+ Memory::Free(m_Buffer);
+}
+
+void
+BasicFileBuffer::Read(void* Data, uint64_t Size, uint64_t FileOffset)
+{
+ if (m_Buffer == nullptr || (Size > m_BufferSize) || (FileOffset + Size > m_Size))
+ {
+ m_Base.Read(Data, Size, FileOffset);
+ return;
+ }
+ uint8_t* WritePtr = ((uint8_t*)Data);
+ uint64_t Begin = FileOffset;
+ uint64_t End = FileOffset + Size;
+ if (FileOffset <= m_BufferStart)
+ {
+ if (End > m_BufferStart)
+ {
+ uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart;
+ memcpy(WritePtr + End - Count - FileOffset, m_Buffer, Count);
+ End -= Count;
+ if (Begin == End)
+ {
+ return;
+ }
+ }
+ }
+ else if (FileOffset < m_BufferEnd)
+ {
+ uint64_t Count = Min(m_BufferEnd, End) - FileOffset;
+ memcpy(WritePtr + Begin - FileOffset, m_Buffer + Begin - m_BufferStart, Count);
+ Begin += Count;
+ if (Begin == End)
+ {
+ return;
+ }
+ }
+ m_BufferStart = Begin;
+ m_BufferEnd = Min(Begin + m_BufferSize, m_Size);
+ m_Base.Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart);
+ uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart;
+ memcpy(WritePtr + Begin - FileOffset, m_Buffer, Count);
+ ZEN_ASSERT(Begin + Count == End);
+}
+
/*
___________ __
\__ ___/___ _______/ |_ ______
@@ -601,6 +659,142 @@ TEST_CASE("TemporaryFile")
}
}
+TEST_CASE("BasicFileBuffer")
+{
+ ScopedCurrentDirectoryChange _;
+ {
+ BasicFile File1;
+ const std::string_view Data = "0123456789abcdef";
+ File1.Open("buffered", BasicFile::Mode::kTruncate);
+ for (uint32_t I = 0; I < 16; ++I)
+ {
+ File1.Write(Data.data(), Data.size(), I * Data.size());
+ }
+ }
+ SUBCASE("EvenBuffer")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ // Non-primed
+ {
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 1 * 16);
+ std::string_view Verify(Buffer, 16);
+ CHECK(Verify == "0123456789abcdef");
+ }
+ // Primed
+ {
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 1 * 16);
+ std::string_view Verify(Buffer, 16);
+ CHECK(Verify == "0123456789abcdef");
+ }
+ }
+ SUBCASE("UnevenBuffer")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ // Non-primed
+ {
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 7);
+ std::string_view Verify(Buffer, 16);
+ CHECK(Verify == "789abcdef0123456");
+ }
+ // Primed
+ {
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 7);
+ std::string_view Verify(Buffer, 16);
+ CHECK(Verify == "789abcdef0123456");
+ }
+ }
+ SUBCASE("BiggerThanBuffer")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ char Buffer[17] = {0};
+ File1Buffer.Read(Buffer, 17, 0 * 16);
+ std::string_view Verify(Buffer, 17);
+ CHECK(Verify == "0123456789abcdef0");
+ }
+ SUBCASE("InsideBuffer")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 0 * 16);
+
+ File1Buffer.Read(Buffer, 8, 2);
+ std::string_view Verify(Buffer, 8);
+ CHECK(Verify == "23456789");
+ }
+ SUBCASE("BeginningOfBuffer")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 8);
+
+ File1Buffer.Read(Buffer, 8, 8);
+ std::string_view Verify(Buffer, 8);
+ CHECK(Verify == "89abcdef");
+ }
+ SUBCASE("EndOfBuffer")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 0 * 16);
+
+ File1Buffer.Read(Buffer, 8, 8);
+ std::string_view Verify(Buffer, 8);
+ CHECK(Verify == "89abcdef");
+ }
+ SUBCASE("OverEnd")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 0 * 16);
+
+ File1Buffer.Read(Buffer, 16, 8);
+ std::string_view Verify(Buffer, 16);
+ CHECK(Verify == "89abcdef01234567");
+ }
+ SUBCASE("OverBegin")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 1 * 16);
+
+ File1Buffer.Read(Buffer, 16, 8);
+ std::string_view Verify(Buffer, 16);
+ CHECK(Verify == "89abcdef01234567");
+ }
+ SUBCASE("EndOfFile")
+ {
+ BasicFile File1;
+ File1.Open("buffered", BasicFile::Mode::kRead);
+ BasicFileBuffer File1Buffer(File1, 16);
+ char Buffer[16] = {0};
+ File1Buffer.Read(Buffer, 16, 0 * 16);
+
+ File1Buffer.Read(Buffer, 8, 256 - 8);
+ std::string_view Verify(Buffer, 8);
+ CHECK(Verify == "89abcdef");
+ }
+}
+
void
basicfile_forcelink()
{
diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h
index 1c5a31c5c..5a27befd9 100644
--- a/src/zenutil/include/zenutil/basicfile.h
+++ b/src/zenutil/include/zenutil/basicfile.h
@@ -120,6 +120,29 @@ public:
private:
};
+/** Adds a layer of buffered reading to a BasicFile
+
+This class is not intended for concurrent access, it is not thread safe.
+
+*/
+
+class BasicFileBuffer
+{
+public:
+ BasicFileBuffer(BasicFile& Base, uint64_t BufferSize);
+ ~BasicFileBuffer();
+
+ void Read(void* Data, uint64_t Size, uint64_t FileOffset);
+
+private:
+ BasicFile& m_Base;
+ uint8_t* m_Buffer;
+ const uint64_t m_BufferSize;
+ uint64_t m_Size;
+ uint64_t m_BufferStart;
+ uint64_t m_BufferEnd;
+};
+
ZENCORE_API void basicfile_forcelink();
} // namespace zen