diff options
| author | Stefan Boberg <[email protected]> | 2021-10-21 14:17:18 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-10-21 14:17:18 +0200 |
| commit | efd06036c133654a1799f398345fd1bb3cc632b6 (patch) | |
| tree | dc81e110be0aa8cb025662860843f7f4997f790a | |
| parent | Merge branch 'main' into gc (diff) | |
| parent | zenserver: Tweaked state initialization so we know when we're running for the... (diff) | |
| download | zen-efd06036c133654a1799f398345fd1bb3cc632b6.tar.xz zen-efd06036c133654a1799f398345fd1bb3cc632b6.zip | |
Merged from main
| -rw-r--r-- | zencore/compactbinarypackage.cpp | 46 | ||||
| -rw-r--r-- | zencore/compress.cpp | 193 | ||||
| -rw-r--r-- | zencore/include/zencore/compress.h | 2 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 4 | ||||
| -rw-r--r-- | zenserver/config.h | 1 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 217 | ||||
| -rw-r--r-- | zenserver/projectstore.h | 23 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 184 |
8 files changed, 487 insertions, 183 deletions
diff --git a/zencore/compactbinarypackage.cpp b/zencore/compactbinarypackage.cpp index 88757d47f..f7ce371c8 100644 --- a/zencore/compactbinarypackage.cpp +++ b/zencore/compactbinarypackage.cpp @@ -635,6 +635,11 @@ namespace legacy { Writer.AddBinary(Attachment.AsBinary()); Writer.AddBinaryAttachment(Attachment.GetHash()); } + else if (Attachment.IsCompressedBinary()) + { + Writer.AddBinary(Attachment.AsCompressedBinary().GetCompressed()); + Writer.AddBinaryAttachment(Attachment.GetHash()); + } else if (Attachment.IsNull()) { Writer.AddBinary(MemoryView()); @@ -695,17 +700,32 @@ namespace legacy { SharedBuffer Buffer = SharedBuffer::MakeView(View, ValueField.GetOuterBuffer()).MakeOwned(); CbField HashField = LoadCompactBinary(Reader, Allocator); const IoHash& Hash = HashField.AsAttachment(); - if (HashField.HasError() || IoHash::HashBuffer(Buffer) != Hash) + if (HashField.HasError()) { return false; } - if (HashField.IsObjectAttachment()) + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Buffer)) { - Package.AddAttachment(CbAttachment(CbObject(std::move(Buffer)), Hash)); + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + return false; + } + Package.AddAttachment(CbAttachment(Compressed)); } else { - Package.AddAttachment(CbAttachment(CompositeBuffer(std::move(Buffer)), Hash)); + if (IoHash::HashBuffer(Buffer) != Hash) + { + return false; + } + if (HashField.IsObjectAttachment()) + { + Package.AddAttachment(CbAttachment(CbObject(std::move(Buffer)), Hash)); + } + else + { + Package.AddAttachment(CbAttachment(CompositeBuffer(std::move(Buffer)), Hash)); + } } } } @@ -714,8 +734,22 @@ namespace legacy { const IoHash Hash = ValueField.AsHash(); ZEN_ASSERT(Mapper); - - Package.AddAttachment(CbAttachment((*Mapper)(Hash), Hash)); + if (SharedBuffer AttachmentData = (*Mapper)(Hash)) + { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(AttachmentData)) + { + Package.AddAttachment(CbAttachment(Compressed)); + } + else + { + const CbValidateError ValidationResult = ValidateCompactBinary(AttachmentData.GetView(), CbValidateMode::All); + if (ValidationResult != CbValidateError::None) + { + return false; + } + Package.AddAttachment(CbAttachment(CbObject(std::move(AttachmentData)), Hash)); + } + } } else { diff --git a/zencore/compress.cpp b/zencore/compress.cpp index 8d309e010..4a8d116fa 100644 --- a/zencore/compress.cpp +++ b/zencore/compress.cpp @@ -690,6 +690,90 @@ ValidBufferOrEmpty(BufferType&& CompressedData) return BufferHeader::IsValid(CompressedData) ? CompositeBuffer(std::forward<BufferType>(CompressedData)) : CompositeBuffer(); } +CompositeBuffer +CopyCompressedRange(const BufferHeader& Header, const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize) +{ + if (Header.Method == CompressionMethod::None) + { + UniqueBuffer NewCompressedData = UniqueBuffer::Alloc(RawSize); + CompressedData.CopyTo(NewCompressedData.GetMutableView(), sizeof(Header) + RawOffset); + + BufferHeader NewHeader = Header; + NewHeader.Crc32 = 0; + NewHeader.TotalRawSize = RawSize; + NewHeader.TotalCompressedSize = NewHeader.TotalRawSize + sizeof(BufferHeader); + NewHeader.RawHash = BLAKE3(); + + UniqueBuffer HeaderData = UniqueBuffer::Alloc(sizeof(BufferHeader)); + NewHeader.Write(HeaderData); + + return CompositeBuffer(HeaderData.MoveToShared(), NewCompressedData.MoveToShared()); + } + else + { + UniqueBuffer BlockSizeBuffer; + MemoryView BlockSizeView = + CompressedData.ViewOrCopyRange(sizeof(BufferHeader), Header.BlockCount * sizeof(uint32_t), BlockSizeBuffer); + std::span<uint32_t const> CompressedBlockSizes(reinterpret_cast<const uint32_t*>(BlockSizeView.GetData()), Header.BlockCount); + + const uint64_t BlockSize = uint64_t(1) << Header.BlockSizeExponent; + const uint64_t LastBlockSize = BlockSize - ((Header.BlockCount * BlockSize) - Header.TotalRawSize); + const size_t FirstBlock = uint64_t(RawOffset / BlockSize); + const size_t LastBlock = uint64_t((RawOffset + RawSize - 1) / BlockSize); + uint64_t CompressedOffset = sizeof(BufferHeader) + uint64_t(Header.BlockCount) * sizeof(uint32_t); + + const uint64_t NewBlockCount = LastBlock - FirstBlock + 1; + const uint64_t NewMetaSize = NewBlockCount * sizeof(uint32_t); + uint64_t NewCompressedSize = 0; + uint64_t NewTotalRawSize = 0; + std::vector<uint32_t> NewCompressedBlockSizes; + + NewCompressedBlockSizes.reserve(NewBlockCount); + for (size_t BlockIndex = FirstBlock; BlockIndex <= LastBlock; ++BlockIndex) + { + const uint64_t UncompressedBlockSize = (BlockIndex == Header.BlockCount - 1) ? LastBlockSize : BlockSize; + NewTotalRawSize += UncompressedBlockSize; + + const uint32_t CompressedBlockSize = CompressedBlockSizes[BlockIndex]; + NewCompressedBlockSizes.push_back(CompressedBlockSize); + NewCompressedSize += ByteSwap(CompressedBlockSize); + } + + const uint64_t NewTotalCompressedSize = sizeof(BufferHeader) + NewBlockCount * sizeof(uint32_t) + NewCompressedSize; + UniqueBuffer NewCompressedData = UniqueBuffer::Alloc(NewTotalCompressedSize); + MutableMemoryView NewCompressedBlocks = NewCompressedData.GetMutableView() + sizeof(BufferHeader) + NewMetaSize; + + // Seek to first compressed block + for (size_t BlockIndex = 0; BlockIndex < FirstBlock; ++BlockIndex) + { + const uint64_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + CompressedOffset += CompressedBlockSize; + } + + // Copy blocks + UniqueBuffer CompressedBlockCopy; + const MemoryView CompressedRange = CompressedData.ViewOrCopyRange(CompressedOffset, NewCompressedSize, CompressedBlockCopy); + NewCompressedBlocks.CopyFrom(CompressedRange); + + // Copy block sizes + NewCompressedData.GetMutableView().Mid(sizeof(BufferHeader), NewMetaSize).CopyFrom(MakeMemoryView(NewCompressedBlockSizes)); + + BufferHeader NewHeader; + NewHeader.Crc32 = 0; + NewHeader.Method = Header.Method; + NewHeader.Compressor = Header.Compressor; + NewHeader.CompressionLevel = Header.CompressionLevel; + NewHeader.BlockSizeExponent = Header.BlockSizeExponent; + NewHeader.BlockCount = static_cast<uint32_t>(NewBlockCount); + NewHeader.TotalRawSize = NewTotalRawSize; + NewHeader.TotalCompressedSize = NewTotalCompressedSize; + NewHeader.RawHash = BLAKE3(); + NewHeader.Write(NewCompressedData.GetMutableView().Left(sizeof(BufferHeader) + NewMetaSize)); + + return CompositeBuffer(NewCompressedData.MoveToShared()); + } +} + } // namespace zen::detail namespace zen { @@ -774,6 +858,17 @@ CompressedBuffer::GetRawHash() const return CompressedData ? detail::BufferHeader::Read(CompressedData).RawHash : BLAKE3(); } +CompressedBuffer +CompressedBuffer::CopyRange(uint64_t RawOffset, uint64_t RawSize) const +{ + using namespace detail; + const BufferHeader Header = BufferHeader::Read(CompressedData); + CompressedBuffer Range; + Range.CompressedData = CopyCompressedRange(Header, CompressedData, RawOffset, RawSize); + + return Range; +} + bool CompressedBuffer::TryDecompressTo(MutableMemoryView RawView, uint64_t RawOffset) const { @@ -1040,6 +1135,104 @@ TEST_CASE("CompressedBuffer") ValidateData(Values, ExpectedValues, OffsetCount); } } + + SUBCASE("copy range") + { + const uint64_t BlockSize = 64 * sizeof(uint64_t); + const uint64_t N = 1000; + std::vector<uint64_t> ExpectedValues = GenerateData(N); + + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(MakeMemoryView(ExpectedValues)), + OodleCompressor::Mermaid, + OodleCompressionLevel::Optimal4, + BlockSize); + + { + const uint64_t OffsetCount = 0; + const uint64_t Count = N; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 64; + const uint64_t Count = N - 64; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 64 * 2 + 32; + const uint64_t Count = N - OffsetCount; + const uint64_t RawOffset = OffsetCount * sizeof(uint64_t); + const uint64_t RawSize = Count * sizeof(uint64_t); + uint64_t FirstBlockOffset = RawOffset % BlockSize; + + SharedBuffer Uncompressed = Compressed.CopyRange(RawOffset, RawSize).Decompress(); + std::span<uint64_t const> AllValues((const uint64_t*)Uncompressed.GetData(), RawSize / sizeof(uint64_t)); + std::span<uint64_t const> Values((const uint64_t*)(((const uint8_t*)(Uncompressed.GetData()) + FirstBlockOffset)), + RawSize / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 64 * 2 + 63; + const uint64_t Count = N - OffsetCount - 5; + const uint64_t RawOffset = OffsetCount * sizeof(uint64_t); + const uint64_t RawSize = Count * sizeof(uint64_t); + uint64_t FirstBlockOffset = RawOffset % BlockSize; + + SharedBuffer Uncompressed = Compressed.CopyRange(RawOffset, RawSize).Decompress(); + std::span<uint64_t const> AllValues((const uint64_t*)Uncompressed.GetData(), RawSize / sizeof(uint64_t)); + std::span<uint64_t const> Values((const uint64_t*)(((const uint8_t*)(Uncompressed.GetData()) + FirstBlockOffset)), + RawSize / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + } + + SUBCASE("copy uncompressed range") + { + const uint64_t BlockSize = 64 * sizeof(uint64_t); + const uint64_t N = 1000; + std::vector<uint64_t> ExpectedValues = GenerateData(N); + + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(MakeMemoryView(ExpectedValues)), + OodleCompressor::NotSet, + OodleCompressionLevel::None); + + { + const uint64_t OffsetCount = 0; + const uint64_t Count = N; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 1; + const uint64_t Count = N - OffsetCount; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 42; + const uint64_t Count = 100; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + } } void diff --git a/zencore/include/zencore/compress.h b/zencore/include/zencore/compress.h index 426b4981a..d37ecfa79 100644 --- a/zencore/include/zencore/compress.h +++ b/zencore/include/zencore/compress.h @@ -105,6 +105,8 @@ public: /** Returns the hash of the raw data. Zero on error or if this is null. */ [[nodiscard]] ZENCORE_API BLAKE3 GetRawHash() const; + [[nodiscard]] ZENCORE_API CompressedBuffer CopyRange(uint64_t RawOffset, uint64_t RawSize = ~uint64_t(0)) const; + /** * Returns the compressor and compression level used by this buffer. * diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 6da60aa06..23be9f729 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1013,7 +1013,7 @@ TEST_CASE("project.basic") { uint8_t AttachData[] = {1, 2, 3}; - zen::CbAttachment Attach{zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})}; + zen::CbAttachment Attach{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}))}; zen::CbObjectWriter OpWriter; OpWriter << "key" @@ -2136,7 +2136,7 @@ TEST_CASE("http.basics") { cpr::Response r = cpr::Get(cpr::Url{"{}/testing/hello"_format(BaseUri)}); - CHECK_EQ(r.status_code, 200); + CHECK(IsHttpSuccessCode(r.status_code)); } { diff --git a/zenserver/config.h b/zenserver/config.h index d68549616..2a5a17fb9 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -65,6 +65,7 @@ struct ZenServiceConfig { bool StructuredCacheEnabled = true; bool ShouldCrash = false; // Option for testing crash handling + bool IsFirstRun = false; #if ZEN_ENABLE_MESH bool MeshEnabled = false; // Experimental p2p mesh discovery #endif diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index d54d03450..73d61c124 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -272,9 +272,9 @@ private: ////////////////////////////////////////////////////////////////////////// -ProjectStore::Oplog::Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath) -: m_OuterProject(Outer) -, m_CasStore(Store) +ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath) +: m_OuterProject(Project) +, m_CidStore(Store) , m_OplogId(Id) , m_BasePath(BasePath) { @@ -319,7 +319,10 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) { _.ReleaseNow(); - return m_CasStore.FindChunk(ChunkIt->second); + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkIt->second); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) @@ -328,14 +331,20 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; - return IoBufferBuilder::MakeFromFile(FilePath.native().c_str()); + IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath.native().c_str()); + FileChunk.SetContentType(ZenContentType::kBinary); + + return FileChunk; } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { _.ReleaseNow(); - return m_CasStore.FindChunk(MetaIt->second); + IoBuffer Chunk = m_CidStore.FindChunkByCid(MetaIt->second); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; } return {}; @@ -540,38 +549,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) for (const auto& Attach : Attachments) { - IoBuffer AttachmentData; - - if (Attach.IsBinary()) - { - AttachmentData = Attach.AsBinary().AsIoBuffer(); - } - else if (Attach.IsCompressedBinary()) - { - ZEN_NOT_IMPLEMENTED("Compressed binary attachments are currently not supported for oplogs"); - - AttachmentData = Attach.AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer(); - } - else if (Attach.IsObject()) - { - AttachmentData = Attach.AsObject().GetBuffer().AsIoBuffer(); - } - else - { - ZEN_NOT_IMPLEMENTED("Unknown attachment type"); - } - - ZEN_ASSERT(AttachmentData); + ZEN_ASSERT(Attach.IsCompressedBinary()); - CasStore::InsertResult Result = m_CasStore.InsertChunk(AttachmentData, Attach.GetHash()); + CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.GetRawSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData); - const uint64_t AttachmentSize = AttachmentData.Size(); - - if (Result.New) + if (InsertResult.New) { NewAttachmentBytes += AttachmentSize; } - AttachmentBytes += AttachmentSize; } @@ -587,9 +574,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) ////////////////////////////////////////////////////////////////////////// -ProjectStore::Project::Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath) +ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) : m_ProjectStore(PrjStore) -, m_CasStore(Store) +, m_CidStore(Store) , m_OplogStoragePath(BasePath) { } @@ -678,7 +665,7 @@ ProjectStore::Project::NewOplog(std::string_view OplogId) try { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; + Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; return &Log; } @@ -718,7 +705,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) try { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; + Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; Log.ReplayLog(); @@ -787,10 +774,10 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath) : m_Log(zen::logging::Get("project")) , m_ProjectBasePath(BasePath) -, m_CasStore(Store) +, m_CidStore(Store) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); @@ -855,7 +842,7 @@ ProjectStore::OpenProject(std::string_view ProjectId) { ZEN_INFO("opening project {} @ {}", ProjectId, ProjectBasePath); - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, ProjectBasePath).first->second; + ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, ProjectBasePath).first->second; Prj.Identifier = ProjectId; Prj.Read(); return &Prj; @@ -879,7 +866,7 @@ ProjectStore::NewProject(std::filesystem::path BasePath, { RwLock::ExclusiveLockScope _(m_ProjectsLock); - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, BasePath).first->second; + ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, BasePath).first->second; Prj.Identifier = ProjectId; Prj.RootDir = RootDir; Prj.EngineRootDir = EngineRootDir; @@ -920,9 +907,9 @@ ProjectStore::OpenProjectOplog(std::string_view ProjectId, std::string_view Oplo ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) -: m_CasStore(Store) -, m_Log(logging::Get("project")) +HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) +: m_Log(logging::Get("project")) +, m_CidStore(Store) , m_ProjectStore(Projects) { using namespace std::literals; @@ -1122,16 +1109,24 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) Oid Obj = Oid::FromHexString(ChunkId); - IoBuffer Value = Log.FindChunk(Obj); + IoBuffer Chunk = Log.FindChunk(Obj); + if (!Chunk) + { + m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } - if (Value) + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - CbObjectWriter Response; - Response << "size" << Value.Size(); - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + ChunkSize = Compressed.GetRawSize(); } - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + CbObjectWriter Response; + Response << "size" << ChunkSize; + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); @@ -1176,50 +1171,81 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) } } - ZEN_DEBUG("chunk - {} / {} / {}", ProjectId, OplogId, ChunkId); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + if (AcceptType == HttpContentType::kUnknownContentType) + { + AcceptType = HttpContentType::kBinary; + } ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { + m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Log = *FoundLog; + Oid Obj = Oid::FromHexString(ChunkId); - Oid Obj = Oid::FromHexString(ChunkId); + IoBuffer Chunk = Log.FindChunk(Obj); + if (!Chunk) + { + m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } - IoBuffer Value = Log.FindChunk(Obj); + IoBuffer Value = Chunk; + HttpContentType ContentType = Chunk.GetContentType(); - switch (HttpVerb Verb = HttpReq.RequestVerb()) + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - case HttpVerb::kHead: - case HttpVerb::kGet: - if (!Value) + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + + if (IsOffset) + { + if ((Offset + Size) > Compressed.GetRawSize()) { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + Size = Compressed.GetRawSize() - Offset; } - if (IsOffset) + if (AcceptType == HttpContentType::kBinary) { - if (Offset > Value.Size()) - { - Offset = Value.Size(); - } - - if ((Offset + Size) > Value.Size()) - { - Size = Value.Size() - Offset; - } - - // Send only a subset of data - IoBuffer InnerValue(Value, Offset, Size); - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, InnerValue); + Value = Compressed.Decompress(Offset, Size).AsIoBuffer(); + ContentType = HttpContentType::kBinary; } - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + else + { + Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); + ContentType = HttpContentType::kCompressedBinary; + } + } + else + { + if (AcceptType == HttpContentType::kBinary) + { + Value = Compressed.Decompress().AsIoBuffer(); + ContentType = HttpContentType::kBinary; + } + else + { + Value = Compressed.GetCompressed().Flatten().AsIoBuffer(); + ContentType = HttpContentType::kCompressedBinary; + } + } + } + else if (IsOffset) + { + if ((Offset + Size) > Chunk.GetSize()) + { + Size = Chunk.GetSize() - Offset; + } + Value = IoBuffer(Chunk, Offset, Size); } + + m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet | HttpVerb::kHead); @@ -1228,20 +1254,31 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& HashString = Req.GetCapture(3); + const auto& HashString = Req.GetCapture(3); + IoHash Hash = IoHash::FromHexString(HashString); + HttpContentType AcceptType = HttpReq.AcceptContentType(); - ZEN_DEBUG("oplog hash - {} / {} / {}", ProjectId, OplogId, HashString); + if (AcceptType == HttpContentType::kUnknownContentType) + { + AcceptType = HttpContentType::kBinary; + } - IoHash Hash = IoHash::FromHexString(HashString); + HttpContentType ContentType = HttpContentType::kCompressedBinary; + IoBuffer Value = m_CidStore.FindChunkByCid(Hash); - if (IoBuffer Value = m_CasStore.FindChunk(Hash)) + if (!Value) { - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + if (AcceptType == HttpContentType::kBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); + Value = Compressed.Decompress().AsIoBuffer(); + ContentType = HttpContentType::kBinary; + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet); @@ -1273,7 +1310,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) { const IoHash FileHash = Entry.AsHash(); - if (!m_CasStore.FindChunk(FileHash)) + if (!m_CidStore.FindChunkByCid(FileHash)) { ZEN_DEBUG("prep - NEED: {}", FileHash); @@ -1349,11 +1386,11 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); - if (IoBuffer Data = m_CasStore.FindChunk(Hash)) + if (IoBuffer CompressedData = m_CidStore.FindChunkByCid(Hash)) { - return SharedBuffer(std::move(Data)); + return SharedBuffer(std::move(CompressedData)); } - else if (Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) + else if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) { return SharedBuffer(std::move(Data)); } @@ -1403,7 +1440,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } - ZEN_INFO("new op #{:4} - {}/{} ({:>6}) {}", OpLsn, ProjectId, OplogId, NiceBytes(Payload.Size()), Core["key"sv].AsString()); + ZEN_INFO("op #{} - '{}' {} '{}/{}' ", OpLsn, Core["key"sv].AsString(), NiceBytes(Payload.Size()), ProjectId, OplogId); HttpReq.WriteResponse(HttpResponseCode::Created); }, @@ -1473,7 +1510,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } - ZEN_INFO("established oplog {} / {}", ProjectId, OplogId); + ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } @@ -1487,7 +1524,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) case HttpVerb::kDelete: { - ZEN_INFO("deleting oplog {}/{}", ProjectId, OplogId); + ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); ProjectIt->DeleteOplog(OplogId); diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index e545d78b9..c9f49217a 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -7,6 +7,7 @@ #include <zenhttp/httpserver.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> +#include <zenstore/cidstore.h> #include <tsl/robin_map.h> #include <zencore/logging.h> @@ -50,14 +51,14 @@ class ProjectStore : public RefCounted struct OplogStorage; public: - ProjectStore(CasStore& Store, std::filesystem::path BasePath); + ProjectStore(CidStore& Store, std::filesystem::path BasePath); ~ProjectStore(); struct Project; struct Oplog { - Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath); + Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath); ~Oplog(); [[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath); @@ -116,7 +117,7 @@ public: using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>; Project* m_OuterProject = nullptr; - CasStore& m_CasStore; + CidStore& m_CidStore; std::filesystem::path m_BasePath; std::filesystem::path m_TempPath; @@ -148,7 +149,7 @@ public: void DeleteOplog(std::string_view OplogId); void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const; - Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath); + Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); ~Project(); void Read(); @@ -160,7 +161,7 @@ public: private: ProjectStore* m_ProjectStore; - CasStore& m_CasStore; + CidStore& m_CidStore; mutable RwLock m_ProjectLock; std::map<std::string, Oplog> m_Oplogs; std::filesystem::path m_OplogStoragePath; @@ -186,7 +187,7 @@ public: private: spdlog::logger& m_Log; - CasStore& m_CasStore; + CidStore& m_CidStore; std::filesystem::path m_ProjectBasePath; RwLock m_ProjectsLock; std::map<std::string, Project> m_Projects; @@ -216,19 +217,19 @@ private: class HttpProjectService : public HttpService { public: - HttpProjectService(CasStore& Store, ProjectStore* Projects); + HttpProjectService(CidStore& Store, ProjectStore* InProjectStore); ~HttpProjectService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; private: - CasStore& m_CasStore; + inline spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + CidStore& m_CidStore; HttpRequestRouter m_Router; Ref<ProjectStore> m_ProjectStore; - - inline spdlog::logger& Log() { return m_Log; } }; /** Project store interface for local clients @@ -242,7 +243,7 @@ private: class LocalProjectService : public RefCounted { protected: - LocalProjectService(CasStore& Store, ProjectStore* Projects); + LocalProjectService(CasStore& CasStore, ProjectStore* Projects); ~LocalProjectService(); public: diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index d3a0afaed..c96fec986 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -116,6 +116,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace std::literals; +using namespace fmt::literals; class ZenServer : public IHttpStatusProvider { @@ -160,77 +161,7 @@ public: throw std::runtime_error("Failed to create mutex '{}' - is another instance already running?"_format(MutexName).c_str()); } - // Check root manifest to deal with schema versioning - - bool WipeState = false; - std::string WipeReason = "Unspecified"; - - std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; - FileContents ManifestData = zen::ReadFile(ManifestPath); - - if (ManifestData.ErrorCode) - { - WipeState = true; - WipeReason = "No manifest present at '{}'"_format(ManifestPath); - } - else - { - IoBuffer Manifest = ManifestData.Flatten(); - - if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); - ValidationResult != CbValidateError::None) - { - ZEN_ERROR("Manifest validation failed: {}, state will be wiped", ValidationResult); - - WipeState = true; - WipeReason = "Validation of manifest at '{}' failed: {}"_format(ManifestPath, ValidationResult); - } - else - { - m_RootManifest = LoadCompactBinaryObject(Manifest); - - const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); - - if (ManifestVersion != ZEN_SCHEMA_VERSION) - { - WipeState = true; - WipeReason = "Manifest schema version: {}, differs from required: {}"_format(ManifestVersion, ZEN_SCHEMA_VERSION); - } - } - } - - // Handle any state wipe - - if (WipeState) - { - ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); - - std::error_code Ec; - for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) - { - if (DirEntry.is_directory()) - { - ZEN_INFO("Deleting '{}'", DirEntry.path()); - - std::filesystem::remove_all(DirEntry.path(), Ec); - - if (Ec) - { - ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message()); - } - } - } - - ZEN_INFO("Wiped all directories in data root"); - - // Write new manifest - - CbObjectWriter Cbo; - Cbo << "schema_version" << ZEN_SCHEMA_VERSION; - m_RootManifest = Cbo.Save(); - - WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer()); - } + InitializeState(ServiceConfig); // Ok so now we're configured, let's kick things off @@ -254,8 +185,8 @@ public: ZEN_INFO("instantiating project service"); - m_ProjectStore = new zen::ProjectStore(*m_CasStore, m_DataRoot / "projects"); - m_HttpProjectService.reset(new zen::HttpProjectService{*m_CasStore, m_ProjectStore}); + m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects"); + m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); #if ZEN_USE_NAMED_PIPES m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); @@ -329,6 +260,7 @@ public: } } + void InitializeState(ZenServiceConfig& ServiceConfig); void InitializeStructuredCache(ZenServiceConfig& ServiceConfig); #if ZEN_ENABLE_MESH @@ -341,7 +273,9 @@ public: void Run() { - Scrub(); + // This is disabled for now, awaiting better scheduling + // + // Scrub(); if (m_ProcessMonitor.IsActive()) { @@ -567,6 +501,101 @@ private: }; void +ZenServer::InitializeState(ZenServiceConfig& ServiceConfig) +{ + // Check root manifest to deal with schema versioning + + bool WipeState = false; + std::string WipeReason = "Unspecified"; + + bool UpdateManifest = false; + std::filesystem::path ManifestPath = m_DataRoot / "root_manifest"; + FileContents ManifestData = zen::ReadFile(ManifestPath); + + if (ManifestData.ErrorCode) + { + if (ServiceConfig.IsFirstRun) + { + ZEN_INFO("Initializing state at '{}'", m_DataRoot); + + UpdateManifest = true; + } + else + { + WipeState = true; + WipeReason = "No manifest present at '{}'"_format(ManifestPath); + } + } + else + { + IoBuffer Manifest = ManifestData.Flatten(); + + if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); + ValidationResult != CbValidateError::None) + { + ZEN_ERROR("Manifest validation failed: {}, state will be wiped", ValidationResult); + + WipeState = true; + WipeReason = "Validation of manifest at '{}' failed: {}"_format(ManifestPath, ValidationResult); + } + else + { + m_RootManifest = LoadCompactBinaryObject(Manifest); + + const int32_t ManifestVersion = m_RootManifest["schema_version"].AsInt32(0); + + if (ManifestVersion != ZEN_SCHEMA_VERSION) + { + WipeState = true; + WipeReason = "Manifest schema version: {}, differs from required: {}"_format(ManifestVersion, ZEN_SCHEMA_VERSION); + } + } + } + + // Handle any state wipe + + if (WipeState) + { + ZEN_WARN("Wiping state at '{}' - reason: '{}'", m_DataRoot, WipeReason); + + std::error_code Ec; + for (const std::filesystem::directory_entry& DirEntry : std::filesystem::directory_iterator{m_DataRoot, Ec}) + { + if (DirEntry.is_directory() && (DirEntry.path().filename() != "logs")) + { + ZEN_INFO("Deleting '{}'", DirEntry.path()); + + std::filesystem::remove_all(DirEntry.path(), Ec); + + if (Ec) + { + ZEN_WARN("Delete of '{}' returned error: '{}'", DirEntry.path(), Ec.message()); + } + } + } + + ZEN_INFO("Wiped all directories in data root"); + + UpdateManifest = true; + } + + if (UpdateManifest) + { + // Write new manifest + + const DateTime Now = DateTime::Now(); + + CbObjectWriter Cbo; + Cbo << "schema_version" << ZEN_SCHEMA_VERSION << "created" << Now << "updated" << Now << "state_id" << Oid::NewOid(); + Cbo << m_RootManifest["id"]; + + m_RootManifest = Cbo.Save(); + + WriteFile(ManifestPath, m_RootManifest.GetBuffer().AsIoBuffer()); + } +} + +void ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) { using namespace std::literals; @@ -800,6 +829,13 @@ main(int argc, char* argv[]) ZenServerOptions GlobalOptions; ZenServiceConfig ServiceConfig; ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig); + + if (!std::filesystem::exists(GlobalOptions.DataDir)) + { + ServiceConfig.IsFirstRun = true; + std::filesystem::create_directories(GlobalOptions.DataDir); + } + InitializeLogging(GlobalOptions); #if ZEN_PLATFORM_WINDOWS |