From 02c19b524e54bc8f2fc36fecd43310a5ed665fcd Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Sat, 22 May 2021 11:41:08 +0200 Subject: Structured cache changes - Changed cachestore to use BasicFile and TCasLog instead of local variants - Added structured cache persistence tests --- zenserver-test/zenserver-test.cpp | 69 ++++++++---- zenserver/cache/cachestore.cpp | 232 ++++++-------------------------------- 2 files changed, 80 insertions(+), 221 deletions(-) diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index b9fa96ca0..dbe7ac9b9 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1100,38 +1100,65 @@ TEST_CASE("z$.basic") const uint16_t PortNumber = 13337; - ZenServerInstance Instance1(TestEnv); - Instance1.SetTestDir(TestDir); - Instance1.SpawnServer(PortNumber); - Instance1.WaitUntilReady(); + const int kIterationCount = 100; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); - auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + { + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); - // Populate with some simple data + // Populate with some simple data - for (int i = 0; i < 100; ++i) - { - zen::CbObjectWriter Cbo; - Cbo << "index" << i; + for (int i = 0; i < kIterationCount; ++i) + { + zen::CbObjectWriter Cbo; + Cbo << "index" << i; + + zen::MemoryOutStream MemOut; + zen::BinaryWriter Writer{MemOut}; + Cbo.Save(Writer); + + zen::IoHash Key = zen::IoHash::HashMemory(&i, sizeof i); + + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}, + cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}}); - zen::MemoryOutStream MemOut; - zen::BinaryWriter Writer{MemOut}; - Cbo.Save(Writer); + CHECK(Result.status_code == 201); + } + + // Retrieve data + + for (int i = 0; i < kIterationCount; ++i) + { + zen::IoHash Key = zen::IoHash::HashMemory(&i, sizeof i); - zen::IoHash Key = zen::IoHash::HashMemory(&i, sizeof i); + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}); - cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}, - cpr::Body{(const char*)MemOut.Data(), MemOut.Size()}, - cpr::Header{{"Content-Type", "application/x-ue-cb"}}); + CHECK(Result.status_code == 200); + } } - // Retrieve data + // Verify that the data persists between process runs (the previous server has exited at this point) - for (int i = 0; i < 100; ++i) { - zen::IoHash Key = zen::IoHash::HashMemory(&i, sizeof i); + ZenServerInstance Instance1(TestEnv); + Instance1.SetTestDir(TestDir); + Instance1.SpawnServer(PortNumber); + Instance1.WaitUntilReady(); - cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}); + // Retrieve data again + + for (int i = 0; i < kIterationCount; ++i) + { + zen::IoHash Key = zen::IoHash::HashMemory(&i, sizeof i); + + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(BaseUri, "test", Key)}); + + CHECK(Result.status_code == 200); + } } } diff --git a/zenserver/cache/cachestore.cpp b/zenserver/cache/cachestore.cpp index 1db58c1e6..f99caa24f 100644 --- a/zenserver/cache/cachestore.cpp +++ b/zenserver/cache/cachestore.cpp @@ -5,14 +5,18 @@ #include #include -#include -#include #include #include #include #include #include +#include #include +#include + +#include +#include +#include #include #include #include @@ -685,178 +689,6 @@ ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCache ////////////////////////////////////////////////////////////////////////// -class ZenFile -{ -public: - void Open(std::filesystem::path FileName, bool IsCreate); - void Read(void* Data, uint64_t Size, uint64_t Offset); - void Write(const void* Data, uint64_t Size, uint64_t Offset); - void Flush(); - void* Handle() { return m_File; } - -private: - CAtlFile m_File; -}; - -void -ZenFile::Open(std::filesystem::path FileName, bool isCreate) -{ - const DWORD dwCreationDisposition = isCreate ? CREATE_ALWAYS : OPEN_EXISTING; - - HRESULT hRes = m_File.Create(FileName.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, dwCreationDisposition); - - if (FAILED(hRes)) - { - throw std::system_error(GetLastError(), std::system_category(), "Failed to open bucket sobs file"); - } -} - -void -ZenFile::Read(void* Data, uint64_t Size, uint64_t Offset) -{ - OVERLAPPED Ovl{}; - - Ovl.Offset = DWORD(Offset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(Offset >> 32); - - HRESULT hRes = m_File.Read(Data, gsl::narrow(Size), &Ovl); - - if (FAILED(hRes)) - { - throw std::system_error(GetLastError(), - std::system_category(), - "Failed to read from file '{}'"_format(zen::PathFromHandle(m_File))); - } -} - -void -ZenFile::Write(const void* Data, uint64_t Size, uint64_t Offset) -{ - OVERLAPPED Ovl{}; - - Ovl.Offset = DWORD(Offset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(Offset >> 32); - - HRESULT hRes = m_File.Write(Data, gsl::narrow(Size), &Ovl); - - if (FAILED(hRes)) - { - throw std::system_error(GetLastError(), std::system_category(), "Failed to write to file '{}'"_format(zen::PathFromHandle(m_File))); - } -} - -void -ZenFile::Flush() -{ - m_File.Flush(); -} - -////////////////////////////////////////////////////////////////////////// - -class ZenLogFile -{ -public: - ZenLogFile(); - ~ZenLogFile(); - - void Open(std::filesystem::path FileName, size_t RecordSize, bool isCreate); - void Append(const void* DataPointer, uint64_t DataSize); - void Replay(std::function&& Handler); - void Flush(); - -private: - CAtlFile m_File; - size_t m_RecordSize = 1; - uint64_t m_AppendOffset = 0; -}; - -ZenLogFile::ZenLogFile() -{ -} - -ZenLogFile::~ZenLogFile() -{ -} - -void -ZenLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreate) -{ - m_RecordSize = RecordSize; - - const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING; - - HRESULT hRes = m_File.Create(FileName.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, dwCreationDisposition); - - if (FAILED(hRes)) - { - throw std::system_error(GetLastError(), std::system_category(), "Failed to open log file" /* TODO: add path */); - } - - // TODO: write/validate header and log contents and prepare for appending/replay -} - -void -ZenLogFile::Replay(std::function&& Handler) -{ - std::vector ReadBuffer; - - ULONGLONG LogFileSize; - m_File.GetSize(LogFileSize); - - // Ensure we end up on a clean boundary - LogFileSize -= LogFileSize % m_RecordSize; - - ReadBuffer.resize(LogFileSize); - - HRESULT hRes = m_File.Read(ReadBuffer.data(), gsl::narrow(ReadBuffer.size())); - - if (FAILED(hRes)) - { - throw std::system_error(GetLastError(), std::system_category(), "Failed to read log file" /* TODO: add context */); - } - - const size_t EntryCount = LogFileSize / m_RecordSize; - - for (int i = 0; i < EntryCount; ++i) - { - Handler(ReadBuffer.data() + (i * m_RecordSize)); - } -} - -void -ZenLogFile::Append(const void* DataPointer, uint64_t DataSize) -{ - HRESULT hRes = m_File.Write(DataPointer, gsl::narrow(DataSize)); - - if (FAILED(hRes)) - { - throw std::system_error(GetLastError(), std::system_category(), "Failed to write to log file" /* TODO: add context */); - } -} - -void -ZenLogFile::Flush() -{ -} - -template -class TZenLogFile : public ZenLogFile -{ -public: - void Replay(std::function&& Handler) - { - ZenLogFile::Replay([&](const void* VoidPtr) { - const T& Record = *reinterpret_cast(VoidPtr); - - Handler(Record); - }); - } - - void Append(const T& Record) { ZenLogFile::Append(&Record, sizeof Record); } -}; - -////////////////////////////////////////////////////////////////////////// - #pragma pack(push) #pragma pack(1) @@ -887,21 +719,21 @@ struct ZenCacheDiskLayer::CacheBucket void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); void Flush(); - void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); - inline bool IsOk() const { return m_Ok; } +private: CasStore& m_CasStore; std::filesystem::path m_BucketDir; Oid m_BucketId; bool m_Ok = false; uint64_t m_LargeObjectThreshold = 1024; - ZenFile m_SobsFile; - TZenLogFile m_SlogFile; - ZenFile m_SidxFile; + BasicFile m_SobsFile; + TCasLogFile m_SlogFile; + BasicFile m_SidxFile; void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); + void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); RwLock m_IndexLock; std::unordered_map m_Index; @@ -923,10 +755,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) m_BucketDir = BucketDir; - std::wstring ManifestPath{(m_BucketDir / "zen_manifest").c_str()}; - std::wstring SobsPath{(m_BucketDir / "zen.sobs").c_str()}; - std::wstring SlogPath{(m_BucketDir / "zen.slog").c_str()}; - std::wstring SidxPath{(m_BucketDir / "zen.sidx").c_str()}; + std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path SobsPath{m_BucketDir / "zen.sobs"}; + std::filesystem::path SlogPath{m_BucketDir / "zen.slog"}; + std::filesystem::path SidxPath{m_BucketDir / "zen.sidx"}; CAtlFile ManifestFile; @@ -963,7 +795,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) if (FAILED(hRes)) { - throw std::system_error(GetLastError(), std::system_category(), "Failed to create bucket manifest"); + ThrowLastError("Failed to create bucket manifest '{}'"_format(ManifestPath)); } m_BucketId.Generate(); @@ -980,9 +812,9 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) // Open and replay log - m_SlogFile.Open(SlogPath, sizeof(DiskIndexEntry), IsNew); + m_SlogFile.Open(SlogPath, IsNew); - uint64_t maxFileOffset = 0; + uint64_t MaxFileOffset = 0; { // This is not technically necessary but may help future static analysis @@ -991,11 +823,11 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; - maxFileOffset = std::max(maxFileOffset, Record.Location.Offset + Record.Location.Size); + MaxFileOffset = std::max(MaxFileOffset, Record.Location.Offset + Record.Location.Size); }); } - m_WriteCursor = (maxFileOffset + 15) & ~15; + m_WriteCursor = (MaxFileOffset + 15) & ~15; m_Ok = true; } @@ -1028,17 +860,17 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O } } - WideStringBuilder<128> dataFilePath; - BuildPath(dataFilePath, HashKey); + WideStringBuilder<128> DataFilePath; + BuildPath(DataFilePath, HashKey); - zen::IoBuffer data = IoBufferBuilder::MakeFromFile(dataFilePath.c_str()); + zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()); - if (!data) + if (!Data) { return false; } - OutValue.Value = data; + OutValue.Value = Data; // TODO: should populate index? @@ -1064,26 +896,26 @@ ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheVa auto it = m_Index.find(HashKey); - DiskLocation loc{.Offset = m_WriteCursor, .Size = gsl::narrow(Value.Value.Size())}; + DiskLocation Loc{.Offset = m_WriteCursor, .Size = gsl::narrow(Value.Value.Size())}; - m_WriteCursor = (m_WriteCursor + loc.Size + 15) & ~15; + m_WriteCursor = (m_WriteCursor + Loc.Size + 15) & ~15; if (it == m_Index.end()) { - m_Index.insert({HashKey, loc}); + m_Index.insert({HashKey, Loc}); } else { // TODO: should check if write is idempotent and bail out if it is? - it->second = loc; + it->second = Loc; } - DiskIndexEntry indexEntry{.Key = HashKey, .Location = loc}; + DiskIndexEntry IndexEntry{.Key = HashKey, .Location = Loc}; - m_SlogFile.Append(indexEntry); + m_SlogFile.Append(IndexEntry); - m_SobsFile.Write(Value.Value.Data(), loc.Size, loc.Offset); + m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset); return; } -- cgit v1.2.3