diff options
| author | Dan Engelbrecht <[email protected]> | 2022-11-18 11:35:13 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-11-18 02:35:13 -0800 |
| commit | 55225621f018904abf7e212320bb784dc64f8ac3 (patch) | |
| tree | 3fb962e9e0553448f9d42612bb078ff072308e1c /zenserver/projectstore.cpp | |
| parent | move BasicFile to zenutil to remove zenstore dependency from zen command (#190) (diff) | |
| download | zen-55225621f018904abf7e212320bb784dc64f8ac3.tar.xz zen-55225621f018904abf7e212320bb784dc64f8ac3.zip | |
Add `import-project` and `export-project` (#183)
* Add `import-project` and `export-project` command line parsing
Diffstat (limited to 'zenserver/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore.cpp | 837 |
1 files changed, 629 insertions, 208 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index d46b312d3..6940583d1 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -5,6 +5,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/compactbinaryvalue.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -15,6 +16,8 @@ #include <zencore/testutils.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpshared.h> #include <zenstore/caslog.h> #include <zenstore/scrubcontext.h> #include <zenutil/basicfile.h> @@ -61,7 +64,7 @@ namespace { // We can't move our folder, probably because it is busy, bail.. return false; } - zen::Sleep(100); + Sleep(100); } while (true); } } // namespace @@ -77,7 +80,7 @@ OpKeyStringAsOId(std::string_view OpKey) Writer << "key"sv << OpKey; XXH3_128Stream KeyHasher; - Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); Oid OpId; @@ -142,7 +145,7 @@ struct ProjectStore::OplogStorage : public RefCounted uint64_t InvalidEntries = 0; m_Oplog.Replay( - [&](const zen::OplogEntry& LogEntry) { + [&](const OplogEntry& LogEntry) { if (LogEntry.OpCoreSize == 0) { ++InvalidEntries; @@ -209,6 +212,8 @@ struct ProjectStore::OplogStorage : public RefCounted { ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); + using namespace std::literals; + SharedBuffer Buffer = Op.GetBuffer(); const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); @@ -216,7 +221,7 @@ struct ProjectStore::OplogStorage : public RefCounted ZEN_ASSERT(WriteSize != 0); XXH3_128Stream KeyHasher; - Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); RwLock::ExclusiveLockScope _(m_RwLock); @@ -275,7 +280,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Stor m_TempPath = m_BasePath / "temp"sv; - zen::CleanDirectory(m_TempPath); + CleanDirectory(m_TempPath); } ProjectStore::Oplog::~Oplog() @@ -476,6 +481,11 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, return false; } + if (Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(FileId, Hash); + } + FileMapEntry Entry; Entry.ServerPath = ServerPath; Entry.ClientPath = ClientPath; @@ -599,9 +609,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) using namespace std::literals; - const CbObject& Core = OpPackage.GetObject(); - const OplogEntry OpEntry = m_Storage->AppendOp(Core); - // Persist attachments uint64_t AttachmentBytes = 0; @@ -624,12 +631,25 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) AttachmentBytes += AttachmentSize; } - const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); + const CbObject& Core = OpPackage.GetObject(); + const uint32_t EntryId = AppendNewOplogEntry(Core); + + ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); + + return EntryId; +} + +uint32_t +ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", - EntryId, - zen::NiceBytes(NewAttachmentBytes), - zen::NiceBytes(AttachmentBytes)); + ZEN_ASSERT(m_Storage); + + using namespace std::literals; + + const OplogEntry OpEntry = m_Storage->AppendOp(Core); + const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); return EntryId; } @@ -672,11 +692,11 @@ ProjectStore::Project::Read() { CbObject Cfg = LoadCompactBinaryObject(Obj); - Identifier = Cfg["id"].AsString(); - RootDir = Cfg["root"].AsString(); - ProjectRootDir = Cfg["project"].AsString(); - EngineRootDir = Cfg["engine"].AsString(); - ProjectFilePath = Cfg["projectfile"].AsString(); + Identifier = Cfg["id"sv].AsString(); + RootDir = Cfg["root"sv].AsString(); + ProjectRootDir = Cfg["project"sv].AsString(); + EngineRootDir = Cfg["engine"sv].AsString(); + ProjectFilePath = Cfg["projectfile"sv].AsString(); } else { @@ -930,7 +950,7 @@ ProjectStore::Project::IsExpired() const ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) : GcStorage(Gc) , GcContributor(Gc) -, m_Log(zen::logging::Get("project")) +, m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectBasePath(BasePath) { @@ -1245,6 +1265,251 @@ ProjectStore::Exists(std::string_view ProjectId) return Project::Exists(BasePathForProject(ProjectId)); } +CbArray +ProjectStore::GetProjectsList() +{ + using namespace std::literals; + + DiscoverProjects(); + + CbWriter Response; + Response.BeginArray(); + + IterateProjects([&Response](ProjectStore::Project& Prj) { + Response.BeginObject(); + Response << "Id"sv << Prj.Identifier; + Response << "RootDir"sv << Prj.RootDir.string(); + Response << "ProjectRootDir"sv << Prj.ProjectRootDir; + Response << "EngineRootDir"sv << Prj.EngineRootDir; + Response << "ProjectFilePath"sv << Prj.ProjectFilePath; + Response.EndObject(); + }); + Response.EndArray(); + return Response.Save().AsArray(); +} + +HttpResponseCode +ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + ZEN_INFO("Project file request for unknown project '{}'", ProjectId); + return HttpResponseCode::NotFound; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + ZEN_INFO("Project file for unknown oplog '{}/{}'", ProjectId, OplogId); + return HttpResponseCode::NotFound; + } + + CbObjectWriter Response; + Response.BeginArray("files"sv); + + FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { + Response.BeginObject(); + Response << "id"sv << Id; + Response << "clientpath"sv << ClientPath; + if (!FilterClient) + { + Response << "serverpath"sv << ServerPath; + } + Response.EndObject(); + }); + + Response.EndArray(); + OutPayload = Response.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +ProjectStore::GetChunkInfo(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + ZEN_INFO("Chunk info request for unknown project '{}'", ProjectId); + return HttpResponseCode::NotFound; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + ZEN_INFO("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId); + return HttpResponseCode::NotFound; + } + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + ZEN_INFO("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId); + return HttpResponseCode::BadRequest; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpResponseCode::NotFound; + } + + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + ChunkSize = Compressed.GetRawSize(); + } + + CbObjectWriter Response; + Response << "size"sv << ChunkSize; + OutPayload = Response.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +ProjectStore::GetChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk) +{ + bool IsOffset = Offset != 0 || Size != ~(0ull); + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + ZEN_INFO("Chunk request for unknown project '{}'", ProjectId); + return HttpResponseCode::NotFound; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + ZEN_INFO("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId); + return HttpResponseCode::NotFound; + } + + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + ZEN_INFO("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId); + return HttpResponseCode::BadRequest; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpResponseCode::NotFound; + } + + OutChunk = Chunk; + HttpContentType ContentType = Chunk.GetContentType(); + + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + + if (IsOffset) + { + uint64_t RawSize = Compressed.GetRawSize(); + if ((Offset + Size) > RawSize) + { + Size = RawSize - Offset; + } + + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + // Value will be a range of compressed blocks that covers the requested range + // The client will have to compensate for any offsets that do not land on an even block size multiple + OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + else + { + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + } + else if (IsOffset) + { + if ((Offset + Size) > Chunk.GetSize()) + { + Size = Chunk.GetSize() - Offset; + } + OutChunk = IoBuffer(Chunk, Offset, Size); + OutChunk.SetContentType(ContentType); + } + + ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); + + return HttpResponseCode::OK; +} + +HttpResponseCode +ProjectStore::GetChunk(const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk) +{ + using namespace std::literals; + + if (Cid.length() != IoHash::StringLength) + { + ZEN_INFO("Chunk request for invalid chunk hash '{}'", Cid); + return HttpResponseCode::BadRequest; + } + + const IoHash Hash = IoHash::FromHexString(Cid); + OutChunk = m_CidStore.FindChunkByCid(Hash); + + if (!OutChunk) + { + ZEN_DEBUG("chunk - '{}' MISSING", Cid); + return HttpResponseCode::NotFound; + } + + if (AcceptType == HttpContentType::kBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(OutChunk)); + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + return HttpResponseCode::OK; +} + ////////////////////////////////////////////////////////////////////////// HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) @@ -1264,34 +1529,15 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) // currently not possible for (arbitrary, external) technical reasons m_Router.RegisterRoute( "list", - [this](HttpRouterRequest& Req) { - m_ProjectStore->DiscoverProjects(); - - CbWriter Response; - Response.BeginArray(); - - m_ProjectStore->IterateProjects([&Response](ProjectStore::Project& Prj) { - Response.BeginObject(); - Response << "Id"sv << Prj.Identifier; - Response << "RootDir"sv << Prj.RootDir.string(); - Response << "ProjectRootDir"sv << Prj.ProjectRootDir; - Response << "EngineRootDir"sv << Prj.EngineRootDir; - Response << "ProjectFilePath"sv << Prj.ProjectFilePath; - Response.EndObject(); - }); - Response.EndArray(); - - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save().AsArray()); - }, + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/batch", [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) @@ -1412,7 +1658,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); ResponsePtr += sizeof(ResponseChunk); } - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); }, HttpVerb::kPost); @@ -1427,42 +1672,17 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; - CbObjectWriter Response; - Response.BeginArray("files"); - - Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { - Response.BeginObject(); - Response << "id"sv << Id; - Response << "clientpath"sv << ClientPath; - if (!FilterClient) - { - Response << "serverpath"sv << ServerPath; - } - Response.EndObject(); - }); - - Response.EndArray(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + CbObject ResponsePayload; + HttpResponseCode Response = m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload); + if (Response != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(Response); + } + return HttpReq.WriteResponse(Response, ResponsePayload); }, HttpVerb::kGet); @@ -1475,41 +1695,13 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - const Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Chunk = Log.FindChunk(Obj); - if (!Chunk) - { - m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - uint64_t ChunkSize = Chunk.GetSize(); - if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + CbObject ResponsePayload; + HttpResponseCode Response = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload); + if (Response != HttpResponseCode::OK) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); - ZEN_ASSERT(!Compressed.IsNull()); - ChunkSize = Compressed.GetRawSize(); + return HttpReq.WriteResponse(Response); } - - CbObjectWriter Response; - Response << "size"sv << ChunkSize; - HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); }, HttpVerb::kGet); @@ -1522,9 +1714,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); - bool IsOffset = false; - uint64_t Offset = 0; - uint64_t Size = ~(0ull); + uint64_t Offset = 0; + uint64_t Size = ~(0ull); auto QueryParms = Req.ServerRequest().GetQueryParams(); @@ -1532,8 +1723,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) { - Offset = OffsetVal.value(); - IsOffset = true; + Offset = OffsetVal.value(); } else { @@ -1545,8 +1735,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) { - Size = SizeVal.value(); - IsOffset = true; + Size = SizeVal.value(); } else { @@ -1555,87 +1744,16 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) } HttpContentType AcceptType = HttpReq.AcceptContentType(); - if (AcceptType == HttpContentType::kUnknownContentType) - { - AcceptType = HttpContentType::kBinary; - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - m_Log.warn("chunk - '{}/{}/{}' FAILED, missing project", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Chunk = Log.FindChunk(Obj); - if (!Chunk) - { - m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - IoBuffer Value = Chunk; - HttpContentType ContentType = Chunk.GetContentType(); - - if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); - ZEN_ASSERT(!Compressed.IsNull()); - - if (IsOffset) - { - if ((Offset + Size) > Compressed.GetRawSize()) - { - Size = Compressed.GetRawSize() - Offset; - } - if (AcceptType == HttpContentType::kBinary) - { - Value = Compressed.Decompress(Offset, Size).AsIoBuffer(); - ContentType = HttpContentType::kBinary; - } - else - { - Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); - ContentType = HttpContentType::kCompressedBinary; - } - } - else - { - if (AcceptType == HttpContentType::kBinary) - { - Value = Compressed.Decompress().AsIoBuffer(); - ContentType = HttpContentType::kBinary; - } - else - { - Value = Compressed.GetCompressed().Flatten().AsIoBuffer(); - ContentType = HttpContentType::kCompressedBinary; - } - } - } - else if (IsOffset) + IoBuffer Chunk; + HttpResponseCode Response = m_ProjectStore->GetChunk(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk); + if (Response != HttpResponseCode::OK) { - if ((Offset + Size) > Chunk.GetSize()) - { - Size = Chunk.GetSize() - Offset; - } - Value = IoBuffer(Chunk, Offset, Size); + return HttpReq.WriteResponse(Response); } - m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); - return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); + m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType())); + return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk); }, HttpVerb::kGet | HttpVerb::kHead); @@ -1814,7 +1932,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath); - zen::WriteFile(BadPackagePath, Payload); + WriteFile(BadPackagePath, Payload); return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); } @@ -1872,7 +1990,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) ProjectStore::Oplog& Oplog = *FoundLog; - if (const std::optional<int32_t> OpId = zen::ParseInt<uint32_t>(OpIdString)) + if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) { if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value())) { @@ -1910,7 +2028,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) case ZenContentType::kCompressedBinary: if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) { - Package.AddAttachment(CbAttachment(Compressed)); + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); } else { @@ -1942,6 +2060,196 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) HttpVerb::kGet); m_Router.RegisterRoute( + "{project}/oplog/{log}/archive", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kGet: + { + CbObjectWriter Response; + Response.BeginArray("entries"sv); + std::unordered_set<IoHash> AttachementHashes; + size_t OpCount = 0; + IoHashStream Hasher; + + FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) { + SharedBuffer Buffer = Op.GetBuffer(); + Hasher.Append(Buffer.GetView()); + Response << Op; + Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + AttachementHashes.emplace(AttachmentHash); + }); + OpCount++; + }); + Response.EndArray(); + + IoHash Checksum = Hasher.GetHash(); + Response.AddHash("checksum"sv, Checksum); + + ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'", + OpCount, + AttachementHashes.size(), + ProjectId, + OplogId, + Checksum); + + CbPackage ResponsePackage; + ResponsePackage.SetObject(Response.Save()); + + std::vector<CbAttachment> Attachments; + Attachments.reserve(AttachementHashes.size()); + for (const IoHash& AttachmentHash : AttachementHashes) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); + if (Payload) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); + ZEN_ASSERT(Compressed); + Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash)); + } + } + ResponsePackage.AddAttachments(Attachments); + + std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences); + const ZenContentType AcceptType = HttpReq.AcceptContentType(); + if (AcceptType == ZenContentType::kCompressedBinary) + { + std::vector<SharedBuffer> Parts; + Parts.reserve(ResponsePayload.size()); + for (const auto& I : ResponsePayload) + { + Parts.emplace_back(SharedBuffer(I)); + } + CompositeBuffer Cmp(std::move(Parts)); + CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp); + HttpReq.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCompressedBinary, + CompressedResponse.GetCompressed().Flatten().AsIoBuffer()); + } + else if (AcceptType == ZenContentType::kCbPackage) + { + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + break; + case HttpVerb::kPost: + { + ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId); + IoBuffer CompressedPayload = HttpReq.ReadPayload(); + IoBuffer Payload = CompressedBuffer::FromCompressed(SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer(); + + CbPackage RequestPackage = ParsePackageMessage(Payload); + CbObject Request = RequestPackage.GetObject(); + IoHash Checksum = Request["checksum"sv].AsHash(); + + std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments(); + zen ::CbArrayView Entries = Request["entries"sv].AsArrayView(); + + ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'", + Entries.Num(), + Attachments.size(), + Checksum, + ProjectId, + OplogId); + std::vector<CbObject> Ops; + Ops.reserve(Entries.Num()); + IoHashStream Hasher; + for (auto& OpEntry : Entries) + { + CbObjectView Core = OpEntry.AsObjectView(); + + if (!Core["key"sv]) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "No oplog entry key specified"); + } + + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + Hasher.Append(OpView); + IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + Ops.emplace_back(Op); + } + IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); + + // Spread over multiple threads? + WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); + std::atomic_int64_t JobCount = 0; + for (const CbAttachment& Attachment : Attachments) + { + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() { + CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); + m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly); + JobCount.fetch_add(-1); + }); + } + while (JobCount.load()) + { + Sleep(1); + } + + ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId); + for (const CbObject& Op : Ops) + { + const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op); + ZEN_DEBUG("oplog entry #{}", OpLsn); + + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", + ProjectId, + OplogId, + OpLsn, + NiceBytes(Op.GetSize()), + Op["key"sv].AsString()); + } + ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId); + return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + } + break; + default: + break; + } + }, + HttpVerb::kPost | HttpVerb::kGet); + m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { const auto& ProjectId = Req.GetCapture(1); @@ -2120,7 +2428,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) std::vector<std::string> OpLogs = Project->ScanForOplogs(); CbObjectWriter Response; - Response << "id"sv << Project->Identifier << "root"sv << PathToUtf8(Project->RootDir); + Response << "id"sv << Project->Identifier; + Response << "root"sv << PathToUtf8(Project->RootDir); + Response << "engine"sv << PathToUtf8(Project->EngineRootDir); + Response << "project"sv << PathToUtf8(Project->ProjectRootDir); + Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); Response.BeginArray("oplogs"sv); for (const std::string& OplogId : OpLogs) @@ -2190,24 +2502,32 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request) namespace testutils { using namespace std::literals; - CbPackage CreateOplogPackage(const Oid& Id, const std::span<const CompressedBuffer>& Attachments) + std::string OidAsString(const Oid& Id) + { + StringBuilder<25> OidStringBuilder; + Id.ToString(OidStringBuilder); + return OidStringBuilder.ToString(); + } + + CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) { CbPackage Package; CbObjectWriter Object; - Object << "key"sv << Id; + Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); - for (const CompressedBuffer& Attachment : Attachments) + for (const auto& Attachment : Attachments) { + CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.GetRawHash())); Object.BeginObject(); - Object << "id" << Oid::NewOid(); - Object << "type" - << "Standard"; - Object << "data" << CbAttachment(Attachment); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; Object.EndObject(); - Package.AddAttachment(CbAttachment(Attachment)); + Package.AddAttachment(Attach); } Object.EndArray(); } @@ -2215,9 +2535,9 @@ namespace testutils { return Package; }; - std::vector<CompressedBuffer> CreateAttachments(const std::span<const size_t>& Sizes) + std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) { - std::vector<CompressedBuffer> Result; + std::vector<std::pair<Oid, CompressedBuffer>> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { @@ -2227,12 +2547,28 @@ namespace testutils { { Data[Idx] = Idx % 255; } - - Result.emplace_back(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()))); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); } return Result; } + uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) + { + if (RawOffset > 0) + { + uint64 BlockSize = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + return 0; + } + return BlockSize > 0 ? RawOffset % BlockSize : 0; + } + return 0; + } + } // namespace testutils TEST_CASE("project.store.create") @@ -2391,6 +2727,91 @@ TEST_CASE("project.store.gc") } } +TEST_CASE("project.store.partial.read") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::vector<Oid> OpIds; + OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv); + CHECK(Oplog != nullptr); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); + Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); + for (auto It : Attachments) + { + Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); + } + } + { + IoBuffer Chunk; + CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.GetRawHash()).ToHexString(), + HttpContentType::kCompressedBinary, + Chunk) == HttpResponseCode::OK); + CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + CHECK(Attachment.GetRawSize() == Attachments[OpIds[1]][0].second.GetRawSize()); + } + + IoBuffer ChunkResult; + CHECK(ProjectStore.GetChunk("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 0, + ~0ull, + HttpContentType::kCompressedBinary, + ChunkResult) == HttpResponseCode::OK); + CHECK(ChunkResult); + CHECK(CompressedBuffer::FromCompressed(SharedBuffer(ChunkResult)).GetRawSize() == Attachments[OpIds[2]][1].second.GetRawSize()); + + IoBuffer PartialChunkResult; + CHECK(ProjectStore.GetChunk("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 5, + 1773, + HttpContentType::kCompressedBinary, + PartialChunkResult) == HttpResponseCode::OK); + CHECK(PartialChunkResult); + CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult)); + CHECK(PartialCompressedResult.GetRawSize() >= 1773); + + uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); + SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); + SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); + const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); + const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); + CHECK(FullDataPtr[0] == PartialDataPtr[0]); +} #endif void |