aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-11-18 11:35:13 +0100
committerGitHub <[email protected]>2022-11-18 02:35:13 -0800
commit55225621f018904abf7e212320bb784dc64f8ac3 (patch)
tree3fb962e9e0553448f9d42612bb078ff072308e1c /zenserver/projectstore.cpp
parentmove BasicFile to zenutil to remove zenstore dependency from zen command (#190) (diff)
downloadzen-55225621f018904abf7e212320bb784dc64f8ac3.tar.xz
zen-55225621f018904abf7e212320bb784dc64f8ac3.zip
Add `import-project` and `export-project` (#183)
* Add `import-project` and `export-project` command line parsing
Diffstat (limited to 'zenserver/projectstore.cpp')
-rw-r--r--zenserver/projectstore.cpp837
1 files changed, 629 insertions, 208 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index d46b312d3..6940583d1 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -5,6 +5,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compactbinaryvalue.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -15,6 +16,8 @@
#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpshared.h>
#include <zenstore/caslog.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/basicfile.h>
@@ -61,7 +64,7 @@ namespace {
// We can't move our folder, probably because it is busy, bail..
return false;
}
- zen::Sleep(100);
+ Sleep(100);
} while (true);
}
} // namespace
@@ -77,7 +80,7 @@ OpKeyStringAsOId(std::string_view OpKey)
Writer << "key"sv << OpKey;
XXH3_128Stream KeyHasher;
- Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
XXH3_128 KeyHash = KeyHasher.GetHash();
Oid OpId;
@@ -142,7 +145,7 @@ struct ProjectStore::OplogStorage : public RefCounted
uint64_t InvalidEntries = 0;
m_Oplog.Replay(
- [&](const zen::OplogEntry& LogEntry) {
+ [&](const OplogEntry& LogEntry) {
if (LogEntry.OpCoreSize == 0)
{
++InvalidEntries;
@@ -209,6 +212,8 @@ struct ProjectStore::OplogStorage : public RefCounted
{
ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp");
+ using namespace std::literals;
+
SharedBuffer Buffer = Op.GetBuffer();
const uint64_t WriteSize = Buffer.GetSize();
const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
@@ -216,7 +221,7 @@ struct ProjectStore::OplogStorage : public RefCounted
ZEN_ASSERT(WriteSize != 0);
XXH3_128Stream KeyHasher;
- Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
XXH3_128 KeyHash = KeyHasher.GetHash();
RwLock::ExclusiveLockScope _(m_RwLock);
@@ -275,7 +280,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Stor
m_TempPath = m_BasePath / "temp"sv;
- zen::CleanDirectory(m_TempPath);
+ CleanDirectory(m_TempPath);
}
ProjectStore::Oplog::~Oplog()
@@ -476,6 +481,11 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
return false;
}
+ if (Hash != IoHash::Zero)
+ {
+ m_ChunkMap.insert_or_assign(FileId, Hash);
+ }
+
FileMapEntry Entry;
Entry.ServerPath = ServerPath;
Entry.ClientPath = ClientPath;
@@ -599,9 +609,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
using namespace std::literals;
- const CbObject& Core = OpPackage.GetObject();
- const OplogEntry OpEntry = m_Storage->AppendOp(Core);
-
// Persist attachments
uint64_t AttachmentBytes = 0;
@@ -624,12 +631,25 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
AttachmentBytes += AttachmentSize;
}
- const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry);
+ const CbObject& Core = OpPackage.GetObject();
+ const uint32_t EntryId = AppendNewOplogEntry(Core);
+
+ ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
+
+ return EntryId;
+}
+
+uint32_t
+ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
+{
+ ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
- ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total",
- EntryId,
- zen::NiceBytes(NewAttachmentBytes),
- zen::NiceBytes(AttachmentBytes));
+ ZEN_ASSERT(m_Storage);
+
+ using namespace std::literals;
+
+ const OplogEntry OpEntry = m_Storage->AppendOp(Core);
+ const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry);
return EntryId;
}
@@ -672,11 +692,11 @@ ProjectStore::Project::Read()
{
CbObject Cfg = LoadCompactBinaryObject(Obj);
- Identifier = Cfg["id"].AsString();
- RootDir = Cfg["root"].AsString();
- ProjectRootDir = Cfg["project"].AsString();
- EngineRootDir = Cfg["engine"].AsString();
- ProjectFilePath = Cfg["projectfile"].AsString();
+ Identifier = Cfg["id"sv].AsString();
+ RootDir = Cfg["root"sv].AsString();
+ ProjectRootDir = Cfg["project"sv].AsString();
+ EngineRootDir = Cfg["engine"sv].AsString();
+ ProjectFilePath = Cfg["projectfile"sv].AsString();
}
else
{
@@ -930,7 +950,7 @@ ProjectStore::Project::IsExpired() const
ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
: GcStorage(Gc)
, GcContributor(Gc)
-, m_Log(zen::logging::Get("project"))
+, m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectBasePath(BasePath)
{
@@ -1245,6 +1265,251 @@ ProjectStore::Exists(std::string_view ProjectId)
return Project::Exists(BasePathForProject(ProjectId));
}
+CbArray
+ProjectStore::GetProjectsList()
+{
+ using namespace std::literals;
+
+ DiscoverProjects();
+
+ CbWriter Response;
+ Response.BeginArray();
+
+ IterateProjects([&Response](ProjectStore::Project& Prj) {
+ Response.BeginObject();
+ Response << "Id"sv << Prj.Identifier;
+ Response << "RootDir"sv << Prj.RootDir.string();
+ Response << "ProjectRootDir"sv << Prj.ProjectRootDir;
+ Response << "EngineRootDir"sv << Prj.EngineRootDir;
+ Response << "ProjectFilePath"sv << Prj.ProjectFilePath;
+ Response.EndObject();
+ });
+ Response.EndArray();
+ return Response.Save().AsArray();
+}
+
+HttpResponseCode
+ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload)
+{
+ using namespace std::literals;
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ ZEN_INFO("Project file request for unknown project '{}'", ProjectId);
+ return HttpResponseCode::NotFound;
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ ZEN_INFO("Project file for unknown oplog '{}/{}'", ProjectId, OplogId);
+ return HttpResponseCode::NotFound;
+ }
+
+ CbObjectWriter Response;
+ Response.BeginArray("files"sv);
+
+ FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
+ Response.BeginObject();
+ Response << "id"sv << Id;
+ Response << "clientpath"sv << ClientPath;
+ if (!FilterClient)
+ {
+ Response << "serverpath"sv << ServerPath;
+ }
+ Response.EndObject();
+ });
+
+ Response.EndArray();
+ OutPayload = Response.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+ProjectStore::GetChunkInfo(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ CbObject& OutPayload)
+{
+ using namespace std::literals;
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ ZEN_INFO("Chunk info request for unknown project '{}'", ProjectId);
+ return HttpResponseCode::NotFound;
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ ZEN_INFO("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId);
+ return HttpResponseCode::NotFound;
+ }
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ ZEN_INFO("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::BadRequest;
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(Obj);
+ if (!Chunk)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::NotFound;
+ }
+
+ uint64_t ChunkSize = Chunk.GetSize();
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ ZEN_ASSERT(!Compressed.IsNull());
+ ChunkSize = Compressed.GetRawSize();
+ }
+
+ CbObjectWriter Response;
+ Response << "size"sv << ChunkSize;
+ OutPayload = Response.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+ProjectStore::GetChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ IoBuffer& OutChunk)
+{
+ bool IsOffset = Offset != 0 || Size != ~(0ull);
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ ZEN_INFO("Chunk request for unknown project '{}'", ProjectId);
+ return HttpResponseCode::NotFound;
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ ZEN_INFO("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId);
+ return HttpResponseCode::NotFound;
+ }
+
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ ZEN_INFO("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::BadRequest;
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(Obj);
+ if (!Chunk)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::NotFound;
+ }
+
+ OutChunk = Chunk;
+ HttpContentType ContentType = Chunk.GetContentType();
+
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ ZEN_ASSERT(!Compressed.IsNull());
+
+ if (IsOffset)
+ {
+ uint64_t RawSize = Compressed.GetRawSize();
+ if ((Offset + Size) > RawSize)
+ {
+ Size = RawSize - Offset;
+ }
+
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ // Value will be a range of compressed blocks that covers the requested range
+ // The client will have to compensate for any offsets that do not land on an even block size multiple
+ OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ }
+ else
+ {
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ OutChunk = Compressed.Decompress().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ }
+ }
+ else if (IsOffset)
+ {
+ if ((Offset + Size) > Chunk.GetSize())
+ {
+ Size = Chunk.GetSize() - Offset;
+ }
+ OutChunk = IoBuffer(Chunk, Offset, Size);
+ OutChunk.SetContentType(ContentType);
+ }
+
+ ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType));
+
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+ProjectStore::GetChunk(const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk)
+{
+ using namespace std::literals;
+
+ if (Cid.length() != IoHash::StringLength)
+ {
+ ZEN_INFO("Chunk request for invalid chunk hash '{}'", Cid);
+ return HttpResponseCode::BadRequest;
+ }
+
+ const IoHash Hash = IoHash::FromHexString(Cid);
+ OutChunk = m_CidStore.FindChunkByCid(Hash);
+
+ if (!OutChunk)
+ {
+ ZEN_DEBUG("chunk - '{}' MISSING", Cid);
+ return HttpResponseCode::NotFound;
+ }
+
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(OutChunk));
+ OutChunk = Compressed.Decompress().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ return HttpResponseCode::OK;
+}
+
//////////////////////////////////////////////////////////////////////////
HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
@@ -1264,34 +1529,15 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
// currently not possible for (arbitrary, external) technical reasons
m_Router.RegisterRoute(
"list",
- [this](HttpRouterRequest& Req) {
- m_ProjectStore->DiscoverProjects();
-
- CbWriter Response;
- Response.BeginArray();
-
- m_ProjectStore->IterateProjects([&Response](ProjectStore::Project& Prj) {
- Response.BeginObject();
- Response << "Id"sv << Prj.Identifier;
- Response << "RootDir"sv << Prj.RootDir.string();
- Response << "ProjectRootDir"sv << Prj.ProjectRootDir;
- Response << "EngineRootDir"sv << Prj.EngineRootDir;
- Response << "ProjectFilePath"sv << Prj.ProjectFilePath;
- Response.EndObject();
- });
- Response.EndArray();
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save().AsArray());
- },
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); },
HttpVerb::kGet);
m_Router.RegisterRoute(
"{project}/oplog/{log}/batch",
[this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
if (!Project)
@@ -1412,7 +1658,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
ResponsePtr += sizeof(ResponseChunk);
}
-
return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
},
HttpVerb::kPost);
@@ -1427,42 +1672,17 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Log = *FoundLog;
-
HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
const bool FilterClient = Params.GetValue("filter"sv) == "client"sv;
- CbObjectWriter Response;
- Response.BeginArray("files");
-
- Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
- Response.BeginObject();
- Response << "id"sv << Id;
- Response << "clientpath"sv << ClientPath;
- if (!FilterClient)
- {
- Response << "serverpath"sv << ServerPath;
- }
- Response.EndObject();
- });
-
- Response.EndArray();
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ CbObject ResponsePayload;
+ HttpResponseCode Response = m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload);
+ if (Response != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(Response);
+ }
+ return HttpReq.WriteResponse(Response, ResponsePayload);
},
HttpVerb::kGet);
@@ -1475,41 +1695,13 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
const auto& OplogId = Req.GetCapture(2);
const auto& ChunkId = Req.GetCapture(3);
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Log = *FoundLog;
-
- const Oid Obj = Oid::FromHexString(ChunkId);
-
- IoBuffer Chunk = Log.FindChunk(Obj);
- if (!Chunk)
- {
- m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- uint64_t ChunkSize = Chunk.GetSize();
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ CbObject ResponsePayload;
+ HttpResponseCode Response = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload);
+ if (Response != HttpResponseCode::OK)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
- ZEN_ASSERT(!Compressed.IsNull());
- ChunkSize = Compressed.GetRawSize();
+ return HttpReq.WriteResponse(Response);
}
-
- CbObjectWriter Response;
- Response << "size"sv << ChunkSize;
- HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
},
HttpVerb::kGet);
@@ -1522,9 +1714,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
const auto& OplogId = Req.GetCapture(2);
const auto& ChunkId = Req.GetCapture(3);
- bool IsOffset = false;
- uint64_t Offset = 0;
- uint64_t Size = ~(0ull);
+ uint64_t Offset = 0;
+ uint64_t Size = ~(0ull);
auto QueryParms = Req.ServerRequest().GetQueryParams();
@@ -1532,8 +1723,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm))
{
- Offset = OffsetVal.value();
- IsOffset = true;
+ Offset = OffsetVal.value();
}
else
{
@@ -1545,8 +1735,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
if (auto SizeVal = ParseInt<uint64_t>(SizeParm))
{
- Size = SizeVal.value();
- IsOffset = true;
+ Size = SizeVal.value();
}
else
{
@@ -1555,87 +1744,16 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
}
HttpContentType AcceptType = HttpReq.AcceptContentType();
- if (AcceptType == HttpContentType::kUnknownContentType)
- {
- AcceptType = HttpContentType::kBinary;
- }
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- m_Log.warn("chunk - '{}/{}/{}' FAILED, missing project", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Log = *FoundLog;
- Oid Obj = Oid::FromHexString(ChunkId);
-
- IoBuffer Chunk = Log.FindChunk(Obj);
- if (!Chunk)
- {
- m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- IoBuffer Value = Chunk;
- HttpContentType ContentType = Chunk.GetContentType();
-
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
- ZEN_ASSERT(!Compressed.IsNull());
-
- if (IsOffset)
- {
- if ((Offset + Size) > Compressed.GetRawSize())
- {
- Size = Compressed.GetRawSize() - Offset;
- }
- if (AcceptType == HttpContentType::kBinary)
- {
- Value = Compressed.Decompress(Offset, Size).AsIoBuffer();
- ContentType = HttpContentType::kBinary;
- }
- else
- {
- Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
- ContentType = HttpContentType::kCompressedBinary;
- }
- }
- else
- {
- if (AcceptType == HttpContentType::kBinary)
- {
- Value = Compressed.Decompress().AsIoBuffer();
- ContentType = HttpContentType::kBinary;
- }
- else
- {
- Value = Compressed.GetCompressed().Flatten().AsIoBuffer();
- ContentType = HttpContentType::kCompressedBinary;
- }
- }
- }
- else if (IsOffset)
+ IoBuffer Chunk;
+ HttpResponseCode Response = m_ProjectStore->GetChunk(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk);
+ if (Response != HttpResponseCode::OK)
{
- if ((Offset + Size) > Chunk.GetSize())
- {
- Size = Chunk.GetSize() - Offset;
- }
- Value = IoBuffer(Chunk, Offset, Size);
+ return HttpReq.WriteResponse(Response);
}
- m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType));
- return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value);
+ m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType()));
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk);
},
HttpVerb::kGet | HttpVerb::kHead);
@@ -1814,7 +1932,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath);
- zen::WriteFile(BadPackagePath, Payload);
+ WriteFile(BadPackagePath, Payload);
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
}
@@ -1872,7 +1990,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
ProjectStore::Oplog& Oplog = *FoundLog;
- if (const std::optional<int32_t> OpId = zen::ParseInt<uint32_t>(OpIdString))
+ if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString))
{
if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value()))
{
@@ -1910,7 +2028,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
case ZenContentType::kCompressedBinary:
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
{
- Package.AddAttachment(CbAttachment(Compressed));
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash));
}
else
{
@@ -1942,6 +2060,196 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "{project}/oplog/{log}/archive",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ switch (Req.ServerRequest().RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbObjectWriter Response;
+ Response.BeginArray("entries"sv);
+ std::unordered_set<IoHash> AttachementHashes;
+ size_t OpCount = 0;
+ IoHashStream Hasher;
+
+ FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) {
+ SharedBuffer Buffer = Op.GetBuffer();
+ Hasher.Append(Buffer.GetView());
+ Response << Op;
+ Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ AttachementHashes.emplace(AttachmentHash);
+ });
+ OpCount++;
+ });
+ Response.EndArray();
+
+ IoHash Checksum = Hasher.GetHash();
+ Response.AddHash("checksum"sv, Checksum);
+
+ ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'",
+ OpCount,
+ AttachementHashes.size(),
+ ProjectId,
+ OplogId,
+ Checksum);
+
+ CbPackage ResponsePackage;
+ ResponsePackage.SetObject(Response.Save());
+
+ std::vector<CbAttachment> Attachments;
+ Attachments.reserve(AttachementHashes.size());
+ for (const IoHash& AttachmentHash : AttachementHashes)
+ {
+ IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
+ if (Payload)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
+ ZEN_ASSERT(Compressed);
+ Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash));
+ }
+ }
+ ResponsePackage.AddAttachments(Attachments);
+
+ std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences);
+ const ZenContentType AcceptType = HttpReq.AcceptContentType();
+ if (AcceptType == ZenContentType::kCompressedBinary)
+ {
+ std::vector<SharedBuffer> Parts;
+ Parts.reserve(ResponsePayload.size());
+ for (const auto& I : ResponsePayload)
+ {
+ Parts.emplace_back(SharedBuffer(I));
+ }
+ CompositeBuffer Cmp(std::move(Parts));
+ CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp);
+ HttpReq.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCompressedBinary,
+ CompressedResponse.GetCompressed().Flatten().AsIoBuffer());
+ }
+ else if (AcceptType == ZenContentType::kCbPackage)
+ {
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+ break;
+ case HttpVerb::kPost:
+ {
+ ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId);
+ IoBuffer CompressedPayload = HttpReq.ReadPayload();
+ IoBuffer Payload = CompressedBuffer::FromCompressed(SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer();
+
+ CbPackage RequestPackage = ParsePackageMessage(Payload);
+ CbObject Request = RequestPackage.GetObject();
+ IoHash Checksum = Request["checksum"sv].AsHash();
+
+ std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments();
+ zen ::CbArrayView Entries = Request["entries"sv].AsArrayView();
+
+ ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'",
+ Entries.Num(),
+ Attachments.size(),
+ Checksum,
+ ProjectId,
+ OplogId);
+ std::vector<CbObject> Ops;
+ Ops.reserve(Entries.Num());
+ IoHashStream Hasher;
+ for (auto& OpEntry : Entries)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+
+ if (!Core["key"sv])
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "No oplog entry key specified");
+ }
+
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ Hasher.Append(OpView);
+ IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ Ops.emplace_back(Op);
+ }
+ IoHash CalculatedChecksum = Hasher.GetHash();
+ if (CalculatedChecksum != Checksum)
+ {
+ ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId);
+
+ // Spread over multiple threads?
+ WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u));
+ std::atomic_int64_t JobCount = 0;
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ JobCount.fetch_add(1);
+ WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() {
+ CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
+ m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly);
+ JobCount.fetch_add(-1);
+ });
+ }
+ while (JobCount.load())
+ {
+ Sleep(1);
+ }
+
+ ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId);
+ for (const CbObject& Op : Ops)
+ {
+ const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op);
+ ZEN_DEBUG("oplog entry #{}", OpLsn);
+
+ if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'",
+ ProjectId,
+ OplogId,
+ OpLsn,
+ NiceBytes(Op.GetSize()),
+ Op["key"sv].AsString());
+ }
+ ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId);
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ }
+ break;
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost | HttpVerb::kGet);
+ m_Router.RegisterRoute(
"{project}/oplog/{log}",
[this](HttpRouterRequest& Req) {
const auto& ProjectId = Req.GetCapture(1);
@@ -2120,7 +2428,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
std::vector<std::string> OpLogs = Project->ScanForOplogs();
CbObjectWriter Response;
- Response << "id"sv << Project->Identifier << "root"sv << PathToUtf8(Project->RootDir);
+ Response << "id"sv << Project->Identifier;
+ Response << "root"sv << PathToUtf8(Project->RootDir);
+ Response << "engine"sv << PathToUtf8(Project->EngineRootDir);
+ Response << "project"sv << PathToUtf8(Project->ProjectRootDir);
+ Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath);
Response.BeginArray("oplogs"sv);
for (const std::string& OplogId : OpLogs)
@@ -2190,24 +2502,32 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request)
namespace testutils {
using namespace std::literals;
- CbPackage CreateOplogPackage(const Oid& Id, const std::span<const CompressedBuffer>& Attachments)
+ std::string OidAsString(const Oid& Id)
+ {
+ StringBuilder<25> OidStringBuilder;
+ Id.ToString(OidStringBuilder);
+ return OidStringBuilder.ToString();
+ }
+
+ CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
- Object << "key"sv << Id;
+ Object << "key"sv << OidAsString(Id);
if (!Attachments.empty())
{
Object.BeginArray("bulkdata");
- for (const CompressedBuffer& Attachment : Attachments)
+ for (const auto& Attachment : Attachments)
{
+ CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.GetRawHash()));
Object.BeginObject();
- Object << "id" << Oid::NewOid();
- Object << "type"
- << "Standard";
- Object << "data" << CbAttachment(Attachment);
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
Object.EndObject();
- Package.AddAttachment(CbAttachment(Attachment));
+ Package.AddAttachment(Attach);
}
Object.EndArray();
}
@@ -2215,9 +2535,9 @@ namespace testutils {
return Package;
};
- std::vector<CompressedBuffer> CreateAttachments(const std::span<const size_t>& Sizes)
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes)
{
- std::vector<CompressedBuffer> Result;
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
Result.reserve(Sizes.size());
for (size_t Size : Sizes)
{
@@ -2227,12 +2547,28 @@ namespace testutils {
{
Data[Idx] = Idx % 255;
}
-
- Result.emplace_back(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())));
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
}
return Result;
}
+ uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
+ {
+ if (RawOffset > 0)
+ {
+ uint64 BlockSize = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ return 0;
+ }
+ return BlockSize > 0 ? RawOffset % BlockSize : 0;
+ }
+ return 0;
+ }
+
} // namespace testutils
TEST_CASE("project.store.create")
@@ -2391,6 +2727,91 @@ TEST_CASE("project.store.gc")
}
}
+TEST_CASE("project.store.partial.read")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
+ ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ std::filesystem::path RootDir = TempDir.Path() / "root"sv;
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv;
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv;
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ std::vector<Oid> OpIds;
+ OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()});
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv);
+ CHECK(Oplog != nullptr);
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});
+ Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99});
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
+ for (auto It : Attachments)
+ {
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second));
+ }
+ }
+ {
+ IoBuffer Chunk;
+ CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.GetRawHash()).ToHexString(),
+ HttpContentType::kCompressedBinary,
+ Chunk) == HttpResponseCode::OK);
+ CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ CHECK(Attachment.GetRawSize() == Attachments[OpIds[1]][0].second.GetRawSize());
+ }
+
+ IoBuffer ChunkResult;
+ CHECK(ProjectStore.GetChunk("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 0,
+ ~0ull,
+ HttpContentType::kCompressedBinary,
+ ChunkResult) == HttpResponseCode::OK);
+ CHECK(ChunkResult);
+ CHECK(CompressedBuffer::FromCompressed(SharedBuffer(ChunkResult)).GetRawSize() == Attachments[OpIds[2]][1].second.GetRawSize());
+
+ IoBuffer PartialChunkResult;
+ CHECK(ProjectStore.GetChunk("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 5,
+ 1773,
+ HttpContentType::kCompressedBinary,
+ PartialChunkResult) == HttpResponseCode::OK);
+ CHECK(PartialChunkResult);
+ CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult));
+ CHECK(PartialCompressedResult.GetRawSize() >= 1773);
+
+ uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
+ SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
+ SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress();
+ const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]);
+ const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
+ CHECK(FullDataPtr[0] == PartialDataPtr[0]);
+}
#endif
void