diff options
| author | Per Larsson <[email protected]> | 2021-10-21 08:21:45 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-10-21 08:21:45 +0200 |
| commit | a63dc510c62830382f243e965be45b705d396879 (patch) | |
| tree | 63799eb1b05788c93a177a422a94af670aa77b84 | |
| parent | Added missing include. (diff) | |
| download | zen-a63dc510c62830382f243e965be45b705d396879.tar.xz zen-a63dc510c62830382f243e965be45b705d396879.zip | |
Compressed oplog attachments
| -rw-r--r-- | zencore/compactbinarypackage.cpp | 46 | ||||
| -rw-r--r-- | zencore/compress.cpp | 193 | ||||
| -rw-r--r-- | zencore/include/zencore/compress.h | 2 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 2 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 217 | ||||
| -rw-r--r-- | zenserver/projectstore.h | 23 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 4 |
7 files changed, 377 insertions, 110 deletions
diff --git a/zencore/compactbinarypackage.cpp b/zencore/compactbinarypackage.cpp index 88757d47f..f7ce371c8 100644 --- a/zencore/compactbinarypackage.cpp +++ b/zencore/compactbinarypackage.cpp @@ -635,6 +635,11 @@ namespace legacy { Writer.AddBinary(Attachment.AsBinary()); Writer.AddBinaryAttachment(Attachment.GetHash()); } + else if (Attachment.IsCompressedBinary()) + { + Writer.AddBinary(Attachment.AsCompressedBinary().GetCompressed()); + Writer.AddBinaryAttachment(Attachment.GetHash()); + } else if (Attachment.IsNull()) { Writer.AddBinary(MemoryView()); @@ -695,17 +700,32 @@ namespace legacy { SharedBuffer Buffer = SharedBuffer::MakeView(View, ValueField.GetOuterBuffer()).MakeOwned(); CbField HashField = LoadCompactBinary(Reader, Allocator); const IoHash& Hash = HashField.AsAttachment(); - if (HashField.HasError() || IoHash::HashBuffer(Buffer) != Hash) + if (HashField.HasError()) { return false; } - if (HashField.IsObjectAttachment()) + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Buffer)) { - Package.AddAttachment(CbAttachment(CbObject(std::move(Buffer)), Hash)); + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Hash) + { + return false; + } + Package.AddAttachment(CbAttachment(Compressed)); } else { - Package.AddAttachment(CbAttachment(CompositeBuffer(std::move(Buffer)), Hash)); + if (IoHash::HashBuffer(Buffer) != Hash) + { + return false; + } + if (HashField.IsObjectAttachment()) + { + Package.AddAttachment(CbAttachment(CbObject(std::move(Buffer)), Hash)); + } + else + { + Package.AddAttachment(CbAttachment(CompositeBuffer(std::move(Buffer)), Hash)); + } } } } @@ -714,8 +734,22 @@ namespace legacy { const IoHash Hash = ValueField.AsHash(); ZEN_ASSERT(Mapper); - - Package.AddAttachment(CbAttachment((*Mapper)(Hash), Hash)); + if (SharedBuffer AttachmentData = (*Mapper)(Hash)) + { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(AttachmentData)) + { + Package.AddAttachment(CbAttachment(Compressed)); + } + else + { + const CbValidateError ValidationResult = ValidateCompactBinary(AttachmentData.GetView(), CbValidateMode::All); + if (ValidationResult != CbValidateError::None) + { + return false; + } + Package.AddAttachment(CbAttachment(CbObject(std::move(AttachmentData)), Hash)); + } + } } else { diff --git a/zencore/compress.cpp b/zencore/compress.cpp index 8d309e010..4a8d116fa 100644 --- a/zencore/compress.cpp +++ b/zencore/compress.cpp @@ -690,6 +690,90 @@ ValidBufferOrEmpty(BufferType&& CompressedData) return BufferHeader::IsValid(CompressedData) ? CompositeBuffer(std::forward<BufferType>(CompressedData)) : CompositeBuffer(); } +CompositeBuffer +CopyCompressedRange(const BufferHeader& Header, const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize) +{ + if (Header.Method == CompressionMethod::None) + { + UniqueBuffer NewCompressedData = UniqueBuffer::Alloc(RawSize); + CompressedData.CopyTo(NewCompressedData.GetMutableView(), sizeof(Header) + RawOffset); + + BufferHeader NewHeader = Header; + NewHeader.Crc32 = 0; + NewHeader.TotalRawSize = RawSize; + NewHeader.TotalCompressedSize = NewHeader.TotalRawSize + sizeof(BufferHeader); + NewHeader.RawHash = BLAKE3(); + + UniqueBuffer HeaderData = UniqueBuffer::Alloc(sizeof(BufferHeader)); + NewHeader.Write(HeaderData); + + return CompositeBuffer(HeaderData.MoveToShared(), NewCompressedData.MoveToShared()); + } + else + { + UniqueBuffer BlockSizeBuffer; + MemoryView BlockSizeView = + CompressedData.ViewOrCopyRange(sizeof(BufferHeader), Header.BlockCount * sizeof(uint32_t), BlockSizeBuffer); + std::span<uint32_t const> CompressedBlockSizes(reinterpret_cast<const uint32_t*>(BlockSizeView.GetData()), Header.BlockCount); + + const uint64_t BlockSize = uint64_t(1) << Header.BlockSizeExponent; + const uint64_t LastBlockSize = BlockSize - ((Header.BlockCount * BlockSize) - Header.TotalRawSize); + const size_t FirstBlock = uint64_t(RawOffset / BlockSize); + const size_t LastBlock = uint64_t((RawOffset + RawSize - 1) / BlockSize); + uint64_t CompressedOffset = sizeof(BufferHeader) + uint64_t(Header.BlockCount) * sizeof(uint32_t); + + const uint64_t NewBlockCount = LastBlock - FirstBlock + 1; + const uint64_t NewMetaSize = NewBlockCount * sizeof(uint32_t); + uint64_t NewCompressedSize = 0; + uint64_t NewTotalRawSize = 0; + std::vector<uint32_t> NewCompressedBlockSizes; + + NewCompressedBlockSizes.reserve(NewBlockCount); + for (size_t BlockIndex = FirstBlock; BlockIndex <= LastBlock; ++BlockIndex) + { + const uint64_t UncompressedBlockSize = (BlockIndex == Header.BlockCount - 1) ? LastBlockSize : BlockSize; + NewTotalRawSize += UncompressedBlockSize; + + const uint32_t CompressedBlockSize = CompressedBlockSizes[BlockIndex]; + NewCompressedBlockSizes.push_back(CompressedBlockSize); + NewCompressedSize += ByteSwap(CompressedBlockSize); + } + + const uint64_t NewTotalCompressedSize = sizeof(BufferHeader) + NewBlockCount * sizeof(uint32_t) + NewCompressedSize; + UniqueBuffer NewCompressedData = UniqueBuffer::Alloc(NewTotalCompressedSize); + MutableMemoryView NewCompressedBlocks = NewCompressedData.GetMutableView() + sizeof(BufferHeader) + NewMetaSize; + + // Seek to first compressed block + for (size_t BlockIndex = 0; BlockIndex < FirstBlock; ++BlockIndex) + { + const uint64_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + CompressedOffset += CompressedBlockSize; + } + + // Copy blocks + UniqueBuffer CompressedBlockCopy; + const MemoryView CompressedRange = CompressedData.ViewOrCopyRange(CompressedOffset, NewCompressedSize, CompressedBlockCopy); + NewCompressedBlocks.CopyFrom(CompressedRange); + + // Copy block sizes + NewCompressedData.GetMutableView().Mid(sizeof(BufferHeader), NewMetaSize).CopyFrom(MakeMemoryView(NewCompressedBlockSizes)); + + BufferHeader NewHeader; + NewHeader.Crc32 = 0; + NewHeader.Method = Header.Method; + NewHeader.Compressor = Header.Compressor; + NewHeader.CompressionLevel = Header.CompressionLevel; + NewHeader.BlockSizeExponent = Header.BlockSizeExponent; + NewHeader.BlockCount = static_cast<uint32_t>(NewBlockCount); + NewHeader.TotalRawSize = NewTotalRawSize; + NewHeader.TotalCompressedSize = NewTotalCompressedSize; + NewHeader.RawHash = BLAKE3(); + NewHeader.Write(NewCompressedData.GetMutableView().Left(sizeof(BufferHeader) + NewMetaSize)); + + return CompositeBuffer(NewCompressedData.MoveToShared()); + } +} + } // namespace zen::detail namespace zen { @@ -774,6 +858,17 @@ CompressedBuffer::GetRawHash() const return CompressedData ? detail::BufferHeader::Read(CompressedData).RawHash : BLAKE3(); } +CompressedBuffer +CompressedBuffer::CopyRange(uint64_t RawOffset, uint64_t RawSize) const +{ + using namespace detail; + const BufferHeader Header = BufferHeader::Read(CompressedData); + CompressedBuffer Range; + Range.CompressedData = CopyCompressedRange(Header, CompressedData, RawOffset, RawSize); + + return Range; +} + bool CompressedBuffer::TryDecompressTo(MutableMemoryView RawView, uint64_t RawOffset) const { @@ -1040,6 +1135,104 @@ TEST_CASE("CompressedBuffer") ValidateData(Values, ExpectedValues, OffsetCount); } } + + SUBCASE("copy range") + { + const uint64_t BlockSize = 64 * sizeof(uint64_t); + const uint64_t N = 1000; + std::vector<uint64_t> ExpectedValues = GenerateData(N); + + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(MakeMemoryView(ExpectedValues)), + OodleCompressor::Mermaid, + OodleCompressionLevel::Optimal4, + BlockSize); + + { + const uint64_t OffsetCount = 0; + const uint64_t Count = N; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 64; + const uint64_t Count = N - 64; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 64 * 2 + 32; + const uint64_t Count = N - OffsetCount; + const uint64_t RawOffset = OffsetCount * sizeof(uint64_t); + const uint64_t RawSize = Count * sizeof(uint64_t); + uint64_t FirstBlockOffset = RawOffset % BlockSize; + + SharedBuffer Uncompressed = Compressed.CopyRange(RawOffset, RawSize).Decompress(); + std::span<uint64_t const> AllValues((const uint64_t*)Uncompressed.GetData(), RawSize / sizeof(uint64_t)); + std::span<uint64_t const> Values((const uint64_t*)(((const uint8_t*)(Uncompressed.GetData()) + FirstBlockOffset)), + RawSize / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 64 * 2 + 63; + const uint64_t Count = N - OffsetCount - 5; + const uint64_t RawOffset = OffsetCount * sizeof(uint64_t); + const uint64_t RawSize = Count * sizeof(uint64_t); + uint64_t FirstBlockOffset = RawOffset % BlockSize; + + SharedBuffer Uncompressed = Compressed.CopyRange(RawOffset, RawSize).Decompress(); + std::span<uint64_t const> AllValues((const uint64_t*)Uncompressed.GetData(), RawSize / sizeof(uint64_t)); + std::span<uint64_t const> Values((const uint64_t*)(((const uint8_t*)(Uncompressed.GetData()) + FirstBlockOffset)), + RawSize / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + } + + SUBCASE("copy uncompressed range") + { + const uint64_t BlockSize = 64 * sizeof(uint64_t); + const uint64_t N = 1000; + std::vector<uint64_t> ExpectedValues = GenerateData(N); + + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(MakeMemoryView(ExpectedValues)), + OodleCompressor::NotSet, + OodleCompressionLevel::None); + + { + const uint64_t OffsetCount = 0; + const uint64_t Count = N; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 1; + const uint64_t Count = N - OffsetCount; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + + { + const uint64_t OffsetCount = 42; + const uint64_t Count = 100; + SharedBuffer Uncompressed = Compressed.CopyRange(OffsetCount * sizeof(uint64_t), Count * sizeof(uint64_t)).Decompress(); + std::span<uint64_t const> Values((const uint64_t*)Uncompressed.GetData(), Uncompressed.GetSize() / sizeof(uint64_t)); + CHECK(Values.size() == Count); + ValidateData(Values, ExpectedValues, OffsetCount); + } + } } void diff --git a/zencore/include/zencore/compress.h b/zencore/include/zencore/compress.h index 426b4981a..d37ecfa79 100644 --- a/zencore/include/zencore/compress.h +++ b/zencore/include/zencore/compress.h @@ -105,6 +105,8 @@ public: /** Returns the hash of the raw data. Zero on error or if this is null. */ [[nodiscard]] ZENCORE_API BLAKE3 GetRawHash() const; + [[nodiscard]] ZENCORE_API CompressedBuffer CopyRange(uint64_t RawOffset, uint64_t RawSize = ~uint64_t(0)) const; + /** * Returns the compressor and compression level used by this buffer. * diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 6da60aa06..bc1404861 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1013,7 +1013,7 @@ TEST_CASE("project.basic") { uint8_t AttachData[] = {1, 2, 3}; - zen::CbAttachment Attach{zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})}; + zen::CbAttachment Attach{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}))}; zen::CbObjectWriter OpWriter; OpWriter << "key" diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index d54d03450..73d61c124 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -272,9 +272,9 @@ private: ////////////////////////////////////////////////////////////////////////// -ProjectStore::Oplog::Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath) -: m_OuterProject(Outer) -, m_CasStore(Store) +ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath) +: m_OuterProject(Project) +, m_CidStore(Store) , m_OplogId(Id) , m_BasePath(BasePath) { @@ -319,7 +319,10 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) { _.ReleaseNow(); - return m_CasStore.FindChunk(ChunkIt->second); + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkIt->second); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) @@ -328,14 +331,20 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; - return IoBufferBuilder::MakeFromFile(FilePath.native().c_str()); + IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath.native().c_str()); + FileChunk.SetContentType(ZenContentType::kBinary); + + return FileChunk; } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { _.ReleaseNow(); - return m_CasStore.FindChunk(MetaIt->second); + IoBuffer Chunk = m_CidStore.FindChunkByCid(MetaIt->second); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; } return {}; @@ -540,38 +549,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) for (const auto& Attach : Attachments) { - IoBuffer AttachmentData; - - if (Attach.IsBinary()) - { - AttachmentData = Attach.AsBinary().AsIoBuffer(); - } - else if (Attach.IsCompressedBinary()) - { - ZEN_NOT_IMPLEMENTED("Compressed binary attachments are currently not supported for oplogs"); - - AttachmentData = Attach.AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer(); - } - else if (Attach.IsObject()) - { - AttachmentData = Attach.AsObject().GetBuffer().AsIoBuffer(); - } - else - { - ZEN_NOT_IMPLEMENTED("Unknown attachment type"); - } - - ZEN_ASSERT(AttachmentData); + ZEN_ASSERT(Attach.IsCompressedBinary()); - CasStore::InsertResult Result = m_CasStore.InsertChunk(AttachmentData, Attach.GetHash()); + CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.GetRawSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData); - const uint64_t AttachmentSize = AttachmentData.Size(); - - if (Result.New) + if (InsertResult.New) { NewAttachmentBytes += AttachmentSize; } - AttachmentBytes += AttachmentSize; } @@ -587,9 +574,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) ////////////////////////////////////////////////////////////////////////// -ProjectStore::Project::Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath) +ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) : m_ProjectStore(PrjStore) -, m_CasStore(Store) +, m_CidStore(Store) , m_OplogStoragePath(BasePath) { } @@ -678,7 +665,7 @@ ProjectStore::Project::NewOplog(std::string_view OplogId) try { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; + Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; return &Log; } @@ -718,7 +705,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) try { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; + Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; Log.ReplayLog(); @@ -787,10 +774,10 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath) : m_Log(zen::logging::Get("project")) , m_ProjectBasePath(BasePath) -, m_CasStore(Store) +, m_CidStore(Store) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); @@ -855,7 +842,7 @@ ProjectStore::OpenProject(std::string_view ProjectId) { ZEN_INFO("opening project {} @ {}", ProjectId, ProjectBasePath); - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, ProjectBasePath).first->second; + ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, ProjectBasePath).first->second; Prj.Identifier = ProjectId; Prj.Read(); return &Prj; @@ -879,7 +866,7 @@ ProjectStore::NewProject(std::filesystem::path BasePath, { RwLock::ExclusiveLockScope _(m_ProjectsLock); - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, BasePath).first->second; + ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, BasePath).first->second; Prj.Identifier = ProjectId; Prj.RootDir = RootDir; Prj.EngineRootDir = EngineRootDir; @@ -920,9 +907,9 @@ ProjectStore::OpenProjectOplog(std::string_view ProjectId, std::string_view Oplo ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) -: m_CasStore(Store) -, m_Log(logging::Get("project")) +HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) +: m_Log(logging::Get("project")) +, m_CidStore(Store) , m_ProjectStore(Projects) { using namespace std::literals; @@ -1122,16 +1109,24 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) Oid Obj = Oid::FromHexString(ChunkId); - IoBuffer Value = Log.FindChunk(Obj); + IoBuffer Chunk = Log.FindChunk(Obj); + if (!Chunk) + { + m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } - if (Value) + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - CbObjectWriter Response; - Response << "size" << Value.Size(); - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + ChunkSize = Compressed.GetRawSize(); } - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + CbObjectWriter Response; + Response << "size" << ChunkSize; + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); @@ -1176,50 +1171,81 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) } } - ZEN_DEBUG("chunk - {} / {} / {}", ProjectId, OplogId, ChunkId); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + if (AcceptType == HttpContentType::kUnknownContentType) + { + AcceptType = HttpContentType::kBinary; + } ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { + m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Log = *FoundLog; + Oid Obj = Oid::FromHexString(ChunkId); - Oid Obj = Oid::FromHexString(ChunkId); + IoBuffer Chunk = Log.FindChunk(Obj); + if (!Chunk) + { + m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } - IoBuffer Value = Log.FindChunk(Obj); + IoBuffer Value = Chunk; + HttpContentType ContentType = Chunk.GetContentType(); - switch (HttpVerb Verb = HttpReq.RequestVerb()) + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - case HttpVerb::kHead: - case HttpVerb::kGet: - if (!Value) + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + + if (IsOffset) + { + if ((Offset + Size) > Compressed.GetRawSize()) { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + Size = Compressed.GetRawSize() - Offset; } - if (IsOffset) + if (AcceptType == HttpContentType::kBinary) { - if (Offset > Value.Size()) - { - Offset = Value.Size(); - } - - if ((Offset + Size) > Value.Size()) - { - Size = Value.Size() - Offset; - } - - // Send only a subset of data - IoBuffer InnerValue(Value, Offset, Size); - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, InnerValue); + Value = Compressed.Decompress(Offset, Size).AsIoBuffer(); + ContentType = HttpContentType::kBinary; } - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + else + { + Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); + ContentType = HttpContentType::kCompressedBinary; + } + } + else + { + if (AcceptType == HttpContentType::kBinary) + { + Value = Compressed.Decompress().AsIoBuffer(); + ContentType = HttpContentType::kBinary; + } + else + { + Value = Compressed.GetCompressed().Flatten().AsIoBuffer(); + ContentType = HttpContentType::kCompressedBinary; + } + } + } + else if (IsOffset) + { + if ((Offset + Size) > Chunk.GetSize()) + { + Size = Chunk.GetSize() - Offset; + } + Value = IoBuffer(Chunk, Offset, Size); } + + m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet | HttpVerb::kHead); @@ -1228,20 +1254,31 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& HashString = Req.GetCapture(3); + const auto& HashString = Req.GetCapture(3); + IoHash Hash = IoHash::FromHexString(HashString); + HttpContentType AcceptType = HttpReq.AcceptContentType(); - ZEN_DEBUG("oplog hash - {} / {} / {}", ProjectId, OplogId, HashString); + if (AcceptType == HttpContentType::kUnknownContentType) + { + AcceptType = HttpContentType::kBinary; + } - IoHash Hash = IoHash::FromHexString(HashString); + HttpContentType ContentType = HttpContentType::kCompressedBinary; + IoBuffer Value = m_CidStore.FindChunkByCid(Hash); - if (IoBuffer Value = m_CasStore.FindChunk(Hash)) + if (!Value) { - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + if (AcceptType == HttpContentType::kBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); + Value = Compressed.Decompress().AsIoBuffer(); + ContentType = HttpContentType::kBinary; + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet); @@ -1273,7 +1310,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) { const IoHash FileHash = Entry.AsHash(); - if (!m_CasStore.FindChunk(FileHash)) + if (!m_CidStore.FindChunkByCid(FileHash)) { ZEN_DEBUG("prep - NEED: {}", FileHash); @@ -1349,11 +1386,11 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); - if (IoBuffer Data = m_CasStore.FindChunk(Hash)) + if (IoBuffer CompressedData = m_CidStore.FindChunkByCid(Hash)) { - return SharedBuffer(std::move(Data)); + return SharedBuffer(std::move(CompressedData)); } - else if (Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) + else if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) { return SharedBuffer(std::move(Data)); } @@ -1403,7 +1440,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } - ZEN_INFO("new op #{:4} - {}/{} ({:>6}) {}", OpLsn, ProjectId, OplogId, NiceBytes(Payload.Size()), Core["key"sv].AsString()); + ZEN_INFO("op #{} - '{}' {} '{}/{}' ", OpLsn, Core["key"sv].AsString(), NiceBytes(Payload.Size()), ProjectId, OplogId); HttpReq.WriteResponse(HttpResponseCode::Created); }, @@ -1473,7 +1510,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } - ZEN_INFO("established oplog {} / {}", ProjectId, OplogId); + ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } @@ -1487,7 +1524,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) case HttpVerb::kDelete: { - ZEN_INFO("deleting oplog {}/{}", ProjectId, OplogId); + ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); ProjectIt->DeleteOplog(OplogId); diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index e545d78b9..c9f49217a 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -7,6 +7,7 @@ #include <zenhttp/httpserver.h> #include <zenstore/cas.h> #include <zenstore/caslog.h> +#include <zenstore/cidstore.h> #include <tsl/robin_map.h> #include <zencore/logging.h> @@ -50,14 +51,14 @@ class ProjectStore : public RefCounted struct OplogStorage; public: - ProjectStore(CasStore& Store, std::filesystem::path BasePath); + ProjectStore(CidStore& Store, std::filesystem::path BasePath); ~ProjectStore(); struct Project; struct Oplog { - Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath); + Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath); ~Oplog(); [[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath); @@ -116,7 +117,7 @@ public: using OidMap = tsl::robin_map<Oid, V, Oid::Hasher>; Project* m_OuterProject = nullptr; - CasStore& m_CasStore; + CidStore& m_CidStore; std::filesystem::path m_BasePath; std::filesystem::path m_TempPath; @@ -148,7 +149,7 @@ public: void DeleteOplog(std::string_view OplogId); void IterateOplogs(std::function<void(const Oplog&)>&& Fn) const; - Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath); + Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); ~Project(); void Read(); @@ -160,7 +161,7 @@ public: private: ProjectStore* m_ProjectStore; - CasStore& m_CasStore; + CidStore& m_CidStore; mutable RwLock m_ProjectLock; std::map<std::string, Oplog> m_Oplogs; std::filesystem::path m_OplogStoragePath; @@ -186,7 +187,7 @@ public: private: spdlog::logger& m_Log; - CasStore& m_CasStore; + CidStore& m_CidStore; std::filesystem::path m_ProjectBasePath; RwLock m_ProjectsLock; std::map<std::string, Project> m_Projects; @@ -216,19 +217,19 @@ private: class HttpProjectService : public HttpService { public: - HttpProjectService(CasStore& Store, ProjectStore* Projects); + HttpProjectService(CidStore& Store, ProjectStore* InProjectStore); ~HttpProjectService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; private: - CasStore& m_CasStore; + inline spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + CidStore& m_CidStore; HttpRequestRouter m_Router; Ref<ProjectStore> m_ProjectStore; - - inline spdlog::logger& Log() { return m_Log; } }; /** Project store interface for local clients @@ -242,7 +243,7 @@ private: class LocalProjectService : public RefCounted { protected: - LocalProjectService(CasStore& Store, ProjectStore* Projects); + LocalProjectService(CasStore& CasStore, ProjectStore* Projects); ~LocalProjectService(); public: diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 269db7394..f030694db 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -251,8 +251,8 @@ public: ZEN_INFO("instantiating project service"); - m_ProjectStore = new zen::ProjectStore(*m_CasStore, m_DataRoot / "projects"); - m_HttpProjectService.reset(new zen::HttpProjectService{*m_CasStore, m_ProjectStore}); + m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects"); + m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore}); m_LocalProjectService = zen::LocalProjectService::New(*m_CasStore, m_ProjectStore); ZEN_INFO("instantiating compute services"); |