diff options
| author | Per Larsson <[email protected]> | 2021-10-21 08:21:45 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-10-21 08:21:45 +0200 |
| commit | a63dc510c62830382f243e965be45b705d396879 (patch) | |
| tree | 63799eb1b05788c93a177a422a94af670aa77b84 /zenserver/projectstore.cpp | |
| parent | Added missing include. (diff) | |
| download | zen-a63dc510c62830382f243e965be45b705d396879.tar.xz zen-a63dc510c62830382f243e965be45b705d396879.zip | |
Compressed oplog attachments
Diffstat (limited to 'zenserver/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore.cpp | 217 |
1 files changed, 127 insertions, 90 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index d54d03450..73d61c124 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -272,9 +272,9 @@ private: ////////////////////////////////////////////////////////////////////////// -ProjectStore::Oplog::Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath) -: m_OuterProject(Outer) -, m_CasStore(Store) +ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath) +: m_OuterProject(Project) +, m_CidStore(Store) , m_OplogId(Id) , m_BasePath(BasePath) { @@ -319,7 +319,10 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) { _.ReleaseNow(); - return m_CasStore.FindChunk(ChunkIt->second); + IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkIt->second); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; } if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end()) @@ -328,14 +331,20 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath; - return IoBufferBuilder::MakeFromFile(FilePath.native().c_str()); + IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath.native().c_str()); + FileChunk.SetContentType(ZenContentType::kBinary); + + return FileChunk; } if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end()) { _.ReleaseNow(); - return m_CasStore.FindChunk(MetaIt->second); + IoBuffer Chunk = m_CidStore.FindChunkByCid(MetaIt->second); + Chunk.SetContentType(ZenContentType::kCompressedBinary); + + return Chunk; } return {}; @@ -540,38 +549,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) for (const auto& Attach : Attachments) { - IoBuffer AttachmentData; - - if (Attach.IsBinary()) - { - AttachmentData = Attach.AsBinary().AsIoBuffer(); - } - else if (Attach.IsCompressedBinary()) - { - ZEN_NOT_IMPLEMENTED("Compressed binary attachments are currently not supported for oplogs"); - - AttachmentData = Attach.AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer(); - } - else if (Attach.IsObject()) - { - AttachmentData = Attach.AsObject().GetBuffer().AsIoBuffer(); - } - else - { - ZEN_NOT_IMPLEMENTED("Unknown attachment type"); - } - - ZEN_ASSERT(AttachmentData); + ZEN_ASSERT(Attach.IsCompressedBinary()); - CasStore::InsertResult Result = m_CasStore.InsertChunk(AttachmentData, Attach.GetHash()); + CompressedBuffer AttachmentData = Attach.AsCompressedBinary(); + const uint64_t AttachmentSize = AttachmentData.GetRawSize(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData); - const uint64_t AttachmentSize = AttachmentData.Size(); - - if (Result.New) + if (InsertResult.New) { NewAttachmentBytes += AttachmentSize; } - AttachmentBytes += AttachmentSize; } @@ -587,9 +574,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) ////////////////////////////////////////////////////////////////////////// -ProjectStore::Project::Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath) +ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath) : m_ProjectStore(PrjStore) -, m_CasStore(Store) +, m_CidStore(Store) , m_OplogStoragePath(BasePath) { } @@ -678,7 +665,7 @@ ProjectStore::Project::NewOplog(std::string_view OplogId) try { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; + Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; return &Log; } @@ -718,7 +705,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) try { - Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second; + Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second; Log.ReplayLog(); @@ -787,10 +774,10 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx) ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath) +ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath) : m_Log(zen::logging::Get("project")) , m_ProjectBasePath(BasePath) -, m_CasStore(Store) +, m_CidStore(Store) { ZEN_INFO("initializing project store at '{}'", BasePath); // m_Log.set_level(spdlog::level::debug); @@ -855,7 +842,7 @@ ProjectStore::OpenProject(std::string_view ProjectId) { ZEN_INFO("opening project {} @ {}", ProjectId, ProjectBasePath); - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, ProjectBasePath).first->second; + ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, ProjectBasePath).first->second; Prj.Identifier = ProjectId; Prj.Read(); return &Prj; @@ -879,7 +866,7 @@ ProjectStore::NewProject(std::filesystem::path BasePath, { RwLock::ExclusiveLockScope _(m_ProjectsLock); - ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, BasePath).first->second; + ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, BasePath).first->second; Prj.Identifier = ProjectId; Prj.RootDir = RootDir; Prj.EngineRootDir = EngineRootDir; @@ -920,9 +907,9 @@ ProjectStore::OpenProjectOplog(std::string_view ProjectId, std::string_view Oplo ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) -: m_CasStore(Store) -, m_Log(logging::Get("project")) +HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) +: m_Log(logging::Get("project")) +, m_CidStore(Store) , m_ProjectStore(Projects) { using namespace std::literals; @@ -1122,16 +1109,24 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) Oid Obj = Oid::FromHexString(ChunkId); - IoBuffer Value = Log.FindChunk(Obj); + IoBuffer Chunk = Log.FindChunk(Obj); + if (!Chunk) + { + m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } - if (Value) + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - CbObjectWriter Response; - Response << "size" << Value.Size(); - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + ChunkSize = Compressed.GetRawSize(); } - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + CbObjectWriter Response; + Response << "size" << ChunkSize; + HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); }, HttpVerb::kGet); @@ -1176,50 +1171,81 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) } } - ZEN_DEBUG("chunk - {} / {} / {}", ProjectId, OplogId, ChunkId); + HttpContentType AcceptType = HttpReq.AcceptContentType(); + if (AcceptType == HttpContentType::kUnknownContentType) + { + AcceptType = HttpContentType::kBinary; + } ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); if (FoundLog == nullptr) { + m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId); return HttpReq.WriteResponse(HttpResponseCode::NotFound); } ProjectStore::Oplog& Log = *FoundLog; + Oid Obj = Oid::FromHexString(ChunkId); - Oid Obj = Oid::FromHexString(ChunkId); + IoBuffer Chunk = Log.FindChunk(Obj); + if (!Chunk) + { + m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } - IoBuffer Value = Log.FindChunk(Obj); + IoBuffer Value = Chunk; + HttpContentType ContentType = Chunk.GetContentType(); - switch (HttpVerb Verb = HttpReq.RequestVerb()) + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) { - case HttpVerb::kHead: - case HttpVerb::kGet: - if (!Value) + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + + if (IsOffset) + { + if ((Offset + Size) > Compressed.GetRawSize()) { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + Size = Compressed.GetRawSize() - Offset; } - if (IsOffset) + if (AcceptType == HttpContentType::kBinary) { - if (Offset > Value.Size()) - { - Offset = Value.Size(); - } - - if ((Offset + Size) > Value.Size()) - { - Size = Value.Size() - Offset; - } - - // Send only a subset of data - IoBuffer InnerValue(Value, Offset, Size); - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, InnerValue); + Value = Compressed.Decompress(Offset, Size).AsIoBuffer(); + ContentType = HttpContentType::kBinary; } - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + else + { + Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); + ContentType = HttpContentType::kCompressedBinary; + } + } + else + { + if (AcceptType == HttpContentType::kBinary) + { + Value = Compressed.Decompress().AsIoBuffer(); + ContentType = HttpContentType::kBinary; + } + else + { + Value = Compressed.GetCompressed().Flatten().AsIoBuffer(); + ContentType = HttpContentType::kCompressedBinary; + } + } + } + else if (IsOffset) + { + if ((Offset + Size) > Chunk.GetSize()) + { + Size = Chunk.GetSize() - Offset; + } + Value = IoBuffer(Chunk, Offset, Size); } + + m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet | HttpVerb::kHead); @@ -1228,20 +1254,31 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - const auto& HashString = Req.GetCapture(3); + const auto& HashString = Req.GetCapture(3); + IoHash Hash = IoHash::FromHexString(HashString); + HttpContentType AcceptType = HttpReq.AcceptContentType(); - ZEN_DEBUG("oplog hash - {} / {} / {}", ProjectId, OplogId, HashString); + if (AcceptType == HttpContentType::kUnknownContentType) + { + AcceptType = HttpContentType::kBinary; + } - IoHash Hash = IoHash::FromHexString(HashString); + HttpContentType ContentType = HttpContentType::kCompressedBinary; + IoBuffer Value = m_CidStore.FindChunkByCid(Hash); - if (IoBuffer Value = m_CasStore.FindChunk(Hash)) + if (!Value) { - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - return HttpReq.WriteResponse(HttpResponseCode::NotFound); + if (AcceptType == HttpContentType::kBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); + Value = Compressed.Decompress().AsIoBuffer(); + ContentType = HttpContentType::kBinary; + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); }, HttpVerb::kGet); @@ -1273,7 +1310,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) { const IoHash FileHash = Entry.AsHash(); - if (!m_CasStore.FindChunk(FileHash)) + if (!m_CidStore.FindChunkByCid(FileHash)) { ZEN_DEBUG("prep - NEED: {}", FileHash); @@ -1349,11 +1386,11 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); - if (IoBuffer Data = m_CasStore.FindChunk(Hash)) + if (IoBuffer CompressedData = m_CidStore.FindChunkByCid(Hash)) { - return SharedBuffer(std::move(Data)); + return SharedBuffer(std::move(CompressedData)); } - else if (Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) + else if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) { return SharedBuffer(std::move(Data)); } @@ -1403,7 +1440,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } - ZEN_INFO("new op #{:4} - {}/{} ({:>6}) {}", OpLsn, ProjectId, OplogId, NiceBytes(Payload.Size()), Core["key"sv].AsString()); + ZEN_INFO("op #{} - '{}' {} '{}/{}' ", OpLsn, Core["key"sv].AsString(), NiceBytes(Payload.Size()), ProjectId, OplogId); HttpReq.WriteResponse(HttpResponseCode::Created); }, @@ -1473,7 +1510,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError); } - ZEN_INFO("established oplog {} / {}", ProjectId, OplogId); + ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } @@ -1487,7 +1524,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) case HttpVerb::kDelete: { - ZEN_INFO("deleting oplog {}/{}", ProjectId, OplogId); + ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId); ProjectIt->DeleteOplog(OplogId); |