aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-10-21 08:21:45 +0200
committerGitHub <[email protected]>2021-10-21 08:21:45 +0200
commita63dc510c62830382f243e965be45b705d396879 (patch)
tree63799eb1b05788c93a177a422a94af670aa77b84 /zenserver/projectstore.cpp
parentAdded missing include. (diff)
downloadzen-a63dc510c62830382f243e965be45b705d396879.tar.xz
zen-a63dc510c62830382f243e965be45b705d396879.zip
Compressed oplog attachments
Diffstat (limited to 'zenserver/projectstore.cpp')
-rw-r--r--zenserver/projectstore.cpp217
1 files changed, 127 insertions, 90 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index d54d03450..73d61c124 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -272,9 +272,9 @@ private:
//////////////////////////////////////////////////////////////////////////
-ProjectStore::Oplog::Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath)
-: m_OuterProject(Outer)
-, m_CasStore(Store)
+ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Store, std::filesystem::path BasePath)
+: m_OuterProject(Project)
+, m_CidStore(Store)
, m_OplogId(Id)
, m_BasePath(BasePath)
{
@@ -319,7 +319,10 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId)
{
_.ReleaseNow();
- return m_CasStore.FindChunk(ChunkIt->second);
+ IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkIt->second);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+
+ return Chunk;
}
if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
@@ -328,14 +331,20 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId)
std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath;
- return IoBufferBuilder::MakeFromFile(FilePath.native().c_str());
+ IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath.native().c_str());
+ FileChunk.SetContentType(ZenContentType::kBinary);
+
+ return FileChunk;
}
if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
{
_.ReleaseNow();
- return m_CasStore.FindChunk(MetaIt->second);
+ IoBuffer Chunk = m_CidStore.FindChunkByCid(MetaIt->second);
+ Chunk.SetContentType(ZenContentType::kCompressedBinary);
+
+ return Chunk;
}
return {};
@@ -540,38 +549,16 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
for (const auto& Attach : Attachments)
{
- IoBuffer AttachmentData;
-
- if (Attach.IsBinary())
- {
- AttachmentData = Attach.AsBinary().AsIoBuffer();
- }
- else if (Attach.IsCompressedBinary())
- {
- ZEN_NOT_IMPLEMENTED("Compressed binary attachments are currently not supported for oplogs");
-
- AttachmentData = Attach.AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer();
- }
- else if (Attach.IsObject())
- {
- AttachmentData = Attach.AsObject().GetBuffer().AsIoBuffer();
- }
- else
- {
- ZEN_NOT_IMPLEMENTED("Unknown attachment type");
- }
-
- ZEN_ASSERT(AttachmentData);
+ ZEN_ASSERT(Attach.IsCompressedBinary());
- CasStore::InsertResult Result = m_CasStore.InsertChunk(AttachmentData, Attach.GetHash());
+ CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
+ const uint64_t AttachmentSize = AttachmentData.GetRawSize();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData);
- const uint64_t AttachmentSize = AttachmentData.Size();
-
- if (Result.New)
+ if (InsertResult.New)
{
NewAttachmentBytes += AttachmentSize;
}
-
AttachmentBytes += AttachmentSize;
}
@@ -587,9 +574,9 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
//////////////////////////////////////////////////////////////////////////
-ProjectStore::Project::Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath)
+ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath)
: m_ProjectStore(PrjStore)
-, m_CasStore(Store)
+, m_CidStore(Store)
, m_OplogStoragePath(BasePath)
{
}
@@ -678,7 +665,7 @@ ProjectStore::Project::NewOplog(std::string_view OplogId)
try
{
- Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second;
+ Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second;
return &Log;
}
@@ -718,7 +705,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId)
try
{
- Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second;
+ Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CidStore, OplogBasePath).first->second;
Log.ReplayLog();
@@ -787,10 +774,10 @@ ProjectStore::Project::Scrub(ScrubContext& Ctx)
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath)
: m_Log(zen::logging::Get("project"))
, m_ProjectBasePath(BasePath)
-, m_CasStore(Store)
+, m_CidStore(Store)
{
ZEN_INFO("initializing project store at '{}'", BasePath);
// m_Log.set_level(spdlog::level::debug);
@@ -855,7 +842,7 @@ ProjectStore::OpenProject(std::string_view ProjectId)
{
ZEN_INFO("opening project {} @ {}", ProjectId, ProjectBasePath);
- ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, ProjectBasePath).first->second;
+ ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, ProjectBasePath).first->second;
Prj.Identifier = ProjectId;
Prj.Read();
return &Prj;
@@ -879,7 +866,7 @@ ProjectStore::NewProject(std::filesystem::path BasePath,
{
RwLock::ExclusiveLockScope _(m_ProjectsLock);
- ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, BasePath).first->second;
+ ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CidStore, BasePath).first->second;
Prj.Identifier = ProjectId;
Prj.RootDir = RootDir;
Prj.EngineRootDir = EngineRootDir;
@@ -920,9 +907,9 @@ ProjectStore::OpenProjectOplog(std::string_view ProjectId, std::string_view Oplo
//////////////////////////////////////////////////////////////////////////
-HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
-: m_CasStore(Store)
-, m_Log(logging::Get("project"))
+HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
+: m_Log(logging::Get("project"))
+, m_CidStore(Store)
, m_ProjectStore(Projects)
{
using namespace std::literals;
@@ -1122,16 +1109,24 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
Oid Obj = Oid::FromHexString(ChunkId);
- IoBuffer Value = Log.FindChunk(Obj);
+ IoBuffer Chunk = Log.FindChunk(Obj);
+ if (!Chunk)
+ {
+ m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
- if (Value)
+ uint64_t ChunkSize = Chunk.GetSize();
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
{
- CbObjectWriter Response;
- Response << "size" << Value.Size();
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ ZEN_ASSERT(!Compressed.IsNull());
+ ChunkSize = Compressed.GetRawSize();
}
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ CbObjectWriter Response;
+ Response << "size" << ChunkSize;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
},
HttpVerb::kGet);
@@ -1176,50 +1171,81 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
}
}
- ZEN_DEBUG("chunk - {} / {} / {}", ProjectId, OplogId, ChunkId);
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
+ if (AcceptType == HttpContentType::kUnknownContentType)
+ {
+ AcceptType = HttpContentType::kBinary;
+ }
ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
if (FoundLog == nullptr)
{
+ m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId);
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
ProjectStore::Oplog& Log = *FoundLog;
+ Oid Obj = Oid::FromHexString(ChunkId);
- Oid Obj = Oid::FromHexString(ChunkId);
+ IoBuffer Chunk = Log.FindChunk(Obj);
+ if (!Chunk)
+ {
+ m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
- IoBuffer Value = Log.FindChunk(Obj);
+ IoBuffer Value = Chunk;
+ HttpContentType ContentType = Chunk.GetContentType();
- switch (HttpVerb Verb = HttpReq.RequestVerb())
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
{
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- if (!Value)
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ ZEN_ASSERT(!Compressed.IsNull());
+
+ if (IsOffset)
+ {
+ if ((Offset + Size) > Compressed.GetRawSize())
{
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ Size = Compressed.GetRawSize() - Offset;
}
- if (IsOffset)
+ if (AcceptType == HttpContentType::kBinary)
{
- if (Offset > Value.Size())
- {
- Offset = Value.Size();
- }
-
- if ((Offset + Size) > Value.Size())
- {
- Size = Value.Size() - Offset;
- }
-
- // Send only a subset of data
- IoBuffer InnerValue(Value, Offset, Size);
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, InnerValue);
+ Value = Compressed.Decompress(Offset, Size).AsIoBuffer();
+ ContentType = HttpContentType::kBinary;
}
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
+ else
+ {
+ Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
+ ContentType = HttpContentType::kCompressedBinary;
+ }
+ }
+ else
+ {
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ Value = Compressed.Decompress().AsIoBuffer();
+ ContentType = HttpContentType::kBinary;
+ }
+ else
+ {
+ Value = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ ContentType = HttpContentType::kCompressedBinary;
+ }
+ }
+ }
+ else if (IsOffset)
+ {
+ if ((Offset + Size) > Chunk.GetSize())
+ {
+ Size = Chunk.GetSize() - Offset;
+ }
+ Value = IoBuffer(Chunk, Offset, Size);
}
+
+ m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType));
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value);
},
HttpVerb::kGet | HttpVerb::kHead);
@@ -1228,20 +1254,31 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- const auto& HashString = Req.GetCapture(3);
+ const auto& HashString = Req.GetCapture(3);
+ IoHash Hash = IoHash::FromHexString(HashString);
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
- ZEN_DEBUG("oplog hash - {} / {} / {}", ProjectId, OplogId, HashString);
+ if (AcceptType == HttpContentType::kUnknownContentType)
+ {
+ AcceptType = HttpContentType::kBinary;
+ }
- IoHash Hash = IoHash::FromHexString(HashString);
+ HttpContentType ContentType = HttpContentType::kCompressedBinary;
+ IoBuffer Value = m_CidStore.FindChunkByCid(Hash);
- if (IoBuffer Value = m_CasStore.FindChunk(Hash))
+ if (!Value)
{
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value));
+ Value = Compressed.Decompress().AsIoBuffer();
+ ContentType = HttpContentType::kBinary;
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value);
},
HttpVerb::kGet);
@@ -1273,7 +1310,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
{
const IoHash FileHash = Entry.AsHash();
- if (!m_CasStore.FindChunk(FileHash))
+ if (!m_CidStore.FindChunkByCid(FileHash))
{
ZEN_DEBUG("prep - NEED: {}", FileHash);
@@ -1349,11 +1386,11 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString();
- if (IoBuffer Data = m_CasStore.FindChunk(Hash))
+ if (IoBuffer CompressedData = m_CidStore.FindChunkByCid(Hash))
{
- return SharedBuffer(std::move(Data));
+ return SharedBuffer(std::move(CompressedData));
}
- else if (Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath))
+ else if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath))
{
return SharedBuffer(std::move(Data));
}
@@ -1403,7 +1440,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
}
- ZEN_INFO("new op #{:4} - {}/{} ({:>6}) {}", OpLsn, ProjectId, OplogId, NiceBytes(Payload.Size()), Core["key"sv].AsString());
+ ZEN_INFO("op #{} - '{}' {} '{}/{}' ", OpLsn, Core["key"sv].AsString(), NiceBytes(Payload.Size()), ProjectId, OplogId);
HttpReq.WriteResponse(HttpResponseCode::Created);
},
@@ -1473,7 +1510,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError);
}
- ZEN_INFO("established oplog {} / {}", ProjectId, OplogId);
+ ZEN_INFO("established oplog '{}/{}'", ProjectId, OplogId);
return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
}
@@ -1487,7 +1524,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
case HttpVerb::kDelete:
{
- ZEN_INFO("deleting oplog {}/{}", ProjectId, OplogId);
+ ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId);
ProjectIt->DeleteOplog(OplogId);