aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /zenserver/projectstore/projectstore.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'zenserver/projectstore/projectstore.cpp')
-rw-r--r--zenserver/projectstore/projectstore.cpp4082
1 files changed, 0 insertions, 4082 deletions
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp
deleted file mode 100644
index 847a79a1d..000000000
--- a/zenserver/projectstore/projectstore.cpp
+++ /dev/null
@@ -1,4082 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "projectstore.h"
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/stream.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zenhttp/httpshared.h>
-#include <zenstore/caslog.h>
-#include <zenstore/cidstore.h>
-#include <zenstore/scrubcontext.h>
-#include <zenutil/cache/rpcrecording.h>
-
-#include "fileremoteprojectstore.h"
-#include "jupiterremoteprojectstore.h"
-#include "remoteprojectstore.h"
-#include "zenremoteprojectstore.h"
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
-#include <xxh3.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#if ZEN_WITH_TESTS
-# include <zencore/testing.h>
-# include <zencore/testutils.h>
-#endif // ZEN_WITH_TESTS
-
-namespace zen {
-
-namespace {
- bool PrepareDirectoryDelete(const std::filesystem::path& Dir, std::filesystem::path& OutDeleteDir)
- {
- int DropIndex = 0;
- do
- {
- if (!std::filesystem::exists(Dir))
- {
- return true;
- }
-
- std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
- std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
- if (std::filesystem::exists(DroppedBucketPath))
- {
- DropIndex++;
- continue;
- }
-
- std::error_code Ec;
- std::filesystem::rename(Dir, DroppedBucketPath, Ec);
- if (!Ec)
- {
- OutDeleteDir = DroppedBucketPath;
- return true;
- }
- if (Ec && !std::filesystem::exists(DroppedBucketPath))
- {
- // We can't move our folder, probably because it is busy, bail..
- return false;
- }
- Sleep(100);
- } while (true);
- }
-
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
- AuthMgr& AuthManager,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize)
- {
- using namespace std::literals;
-
- std::unique_ptr<RemoteProjectStore> RemoteStore;
-
- if (CbObjectView File = Params["file"sv].AsObjectView(); File)
- {
- std::filesystem::path FolderPath(File["path"sv].AsString());
- if (FolderPath.empty())
- {
- return {nullptr, "Missing file path"};
- }
- std::string_view Name(File["name"sv].AsString());
- if (Name.empty())
- {
- return {nullptr, "Missing file name"};
- }
- bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
- bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
-
- FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- FolderPath,
- std::string(Name),
- ForceDisableBlocks,
- ForceEnableTempBlocks};
- RemoteStore = CreateFileRemoteStore(Options);
- }
-
- if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
- {
- std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
- if (CloudServiceUrl.empty())
- {
- return {nullptr, "Missing service url"};
- }
-
- std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl));
- std::string_view Namespace = Cloud["namespace"sv].AsString();
- if (Namespace.empty())
- {
- return {nullptr, "Missing namespace"};
- }
- std::string_view Bucket = Cloud["bucket"sv].AsString();
- if (Bucket.empty())
- {
- return {nullptr, "Missing bucket"};
- }
- std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
- std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
- if (AccessToken.empty())
- {
- std::string_view AccessTokenEnvVariable = Cloud["access-token-env"].AsString();
- if (!AccessTokenEnvVariable.empty())
- {
- AccessToken = GetEnvVariable(AccessTokenEnvVariable);
- }
- }
- std::string_view KeyParam = Cloud["key"sv].AsString();
- if (KeyParam.empty())
- {
- return {nullptr, "Missing key"};
- }
- if (KeyParam.length() != IoHash::StringLength)
- {
- return {nullptr, "Invalid key"};
- }
- IoHash Key = IoHash::FromHexString(KeyParam);
- if (Key == IoHash::Zero)
- {
- return {nullptr, "Invalid key string"};
- }
- bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
- bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
-
- JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- Url,
- std::string(Namespace),
- std::string(Bucket),
- Key,
- std::string(OpenIdProvider),
- AccessToken,
- AuthManager,
- ForceDisableBlocks,
- ForceDisableTempBlocks};
- RemoteStore = CreateJupiterRemoteStore(Options);
- }
-
- if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
- {
- std::string_view Url = Zen["url"sv].AsString();
- std::string_view Project = Zen["project"sv].AsString();
- if (Project.empty())
- {
- return {nullptr, "Missing project"};
- }
- std::string_view Oplog = Zen["oplog"sv].AsString();
- if (Oplog.empty())
- {
- return {nullptr, "Missing oplog"};
- }
- ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- std::string(Url),
- std::string(Project),
- std::string(Oplog)};
- RemoteStore = CreateZenRemoteStore(Options);
- }
-
- if (!RemoteStore)
- {
- return {nullptr, "Unknown remote store type"};
- }
-
- return {std::move(RemoteStore), ""};
- }
-
- std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
- {
- if (Result.ErrorCode == 0)
- {
- return {HttpResponseCode::OK, Result.Text};
- }
- return {static_cast<HttpResponseCode>(Result.ErrorCode),
- Result.Reason.empty() ? Result.Text
- : Result.Text.empty() ? Result.Reason
- : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)};
- }
-
- void CSVHeader(bool Details, bool AttachmentDetails, StringBuilderBase& CSVWriter)
- {
- if (AttachmentDetails)
- {
- CSVWriter << "Project, Oplog, LSN, Key, Cid, Size";
- }
- else if (Details)
- {
- CSVWriter << "Project, Oplog, LSN, Key, Size, AttachmentCount, AttachmentsSize";
- }
- else
- {
- CSVWriter << "Project, Oplog, Key";
- }
- }
-
- void CSVWriteOp(CidStore& CidStore,
- std::string_view ProjectId,
- std::string_view OplogId,
- bool Details,
- bool AttachmentDetails,
- int LSN,
- const Oid& Key,
- CbObject Op,
- StringBuilderBase& CSVWriter)
- {
- StringBuilder<32> KeyStringBuilder;
- Key.ToString(KeyStringBuilder);
- const std::string_view KeyString = KeyStringBuilder.ToView();
-
- SharedBuffer Buffer = Op.GetBuffer();
- if (AttachmentDetails)
- {
- Op.IterateAttachments([&CidStore, &CSVWriter, &ProjectId, &OplogId, LSN, &KeyString](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsAttachment();
- IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
- CSVWriter << "\r\n"
- << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << AttachmentHash.ToHexString()
- << ", " << gsl::narrow<uint64_t>(Attachment.GetSize());
- });
- }
- else if (Details)
- {
- uint64_t AttachmentCount = 0;
- size_t AttachmentsSize = 0;
- Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsAttachment();
- AttachmentCount++;
- IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
- AttachmentsSize += Attachment.GetSize();
- });
- CSVWriter << "\r\n"
- << ProjectId << ", " << OplogId << ", " << LSN << ", " << KeyString << ", " << gsl::narrow<uint64_t>(Buffer.GetSize())
- << ", " << AttachmentCount << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
- }
- else
- {
- CSVWriter << "\r\n" << ProjectId << ", " << OplogId << ", " << KeyString;
- }
- };
-
- void CbWriteOp(CidStore& CidStore,
- bool Details,
- bool OpDetails,
- bool AttachmentDetails,
- int LSN,
- const Oid& Key,
- CbObject Op,
- CbObjectWriter& CbWriter)
- {
- CbWriter.BeginObject();
- {
- SharedBuffer Buffer = Op.GetBuffer();
- CbWriter.AddObjectId("key", Key);
- if (Details)
- {
- CbWriter.AddInteger("lsn", LSN);
- CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Buffer.GetSize()));
- }
- if (AttachmentDetails)
- {
- CbWriter.BeginArray("attachments");
- Op.IterateAttachments([&CidStore, &CbWriter](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsAttachment();
- CbWriter.BeginObject();
- {
- IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
- CbWriter.AddString("cid", AttachmentHash.ToHexString());
- CbWriter.AddInteger("size", gsl::narrow<uint64_t>(Attachment.GetSize()));
- }
- CbWriter.EndObject();
- });
- CbWriter.EndArray();
- }
- else if (Details)
- {
- uint64_t AttachmentCount = 0;
- size_t AttachmentsSize = 0;
- Op.IterateAttachments([&CidStore, &AttachmentCount, &AttachmentsSize](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsAttachment();
- AttachmentCount++;
- IoBuffer Attachment = CidStore.FindChunkByCid(AttachmentHash);
- AttachmentsSize += Attachment.GetSize();
- });
- if (AttachmentCount > 0)
- {
- CbWriter.AddInteger("attachments", AttachmentCount);
- CbWriter.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
- }
- }
- if (OpDetails)
- {
- CbWriter.BeginObject("op");
- for (const CbFieldView& Field : Op)
- {
- if (!Field.HasName())
- {
- CbWriter.AddField(Field);
- continue;
- }
- std::string_view FieldName = Field.GetName();
- CbWriter.AddField(FieldName, Field);
- }
- CbWriter.EndObject();
- }
- }
- CbWriter.EndObject();
- };
-
- void CbWriteOplogOps(CidStore& CidStore,
- ProjectStore::Oplog& Oplog,
- bool Details,
- bool OpDetails,
- bool AttachmentDetails,
- CbObjectWriter& Cbo)
- {
- Cbo.BeginArray("ops");
- {
- Oplog.IterateOplogWithKey([&Cbo, &CidStore, Details, OpDetails, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) {
- CbWriteOp(CidStore, Details, OpDetails, AttachmentDetails, LSN, Key, Op, Cbo);
- });
- }
- Cbo.EndArray();
- }
-
- void CbWriteOplog(CidStore& CidStore,
- ProjectStore::Oplog& Oplog,
- bool Details,
- bool OpDetails,
- bool AttachmentDetails,
- CbObjectWriter& Cbo)
- {
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Oplog.OplogId());
- CbWriteOplogOps(CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
- }
- Cbo.EndObject();
- }
-
- void CbWriteOplogs(CidStore& CidStore,
- ProjectStore::Project& Project,
- std::vector<std::string> OpLogs,
- bool Details,
- bool OpDetails,
- bool AttachmentDetails,
- CbObjectWriter& Cbo)
- {
- Cbo.BeginArray("oplogs");
- {
- for (const std::string& OpLogId : OpLogs)
- {
- ProjectStore::Oplog* Oplog = Project.OpenOplog(OpLogId);
- if (Oplog != nullptr)
- {
- CbWriteOplog(CidStore, *Oplog, Details, OpDetails, AttachmentDetails, Cbo);
- }
- }
- }
- Cbo.EndArray();
- }
-
- void CbWriteProject(CidStore& CidStore,
- ProjectStore::Project& Project,
- std::vector<std::string> OpLogs,
- bool Details,
- bool OpDetails,
- bool AttachmentDetails,
- CbObjectWriter& Cbo)
- {
- Cbo.BeginObject();
- {
- Cbo.AddString("name", Project.Identifier);
- CbWriteOplogs(CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
- }
- Cbo.EndObject();
- }
-
-} // namespace
-
-//////////////////////////////////////////////////////////////////////////
-
-Oid
-OpKeyStringAsOId(std::string_view OpKey)
-{
- using namespace std::literals;
-
- CbObjectWriter Writer;
- Writer << "key"sv << OpKey;
-
- XXH3_128Stream KeyHasher;
- Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash = KeyHasher.GetHash();
-
- Oid OpId;
- memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits));
-
- return OpId;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-struct ProjectStore::OplogStorage : public RefCounted
-{
- OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath)
- {
- }
-
- ~OplogStorage()
- {
- ZEN_INFO("closing oplog storage at {}", m_OplogStoragePath);
- Flush();
- }
-
- [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); }
- [[nodiscard]] static bool Exists(std::filesystem::path BasePath)
- {
- return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops");
- }
-
- static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); }
-
- uint64_t OpBlobsSize() const
- {
- RwLock::SharedLockScope _(m_RwLock);
- return m_NextOpsOffset;
- }
-
- void Open(bool IsCreate)
- {
- using namespace std::literals;
-
- ZEN_INFO("initializing oplog storage at '{}'", m_OplogStoragePath);
-
- if (IsCreate)
- {
- DeleteDirectories(m_OplogStoragePath);
- CreateDirectories(m_OplogStoragePath);
- }
-
- m_Oplog.Open(m_OplogStoragePath / "ops.zlog"sv, IsCreate ? CasLogFile::Mode::kTruncate : CasLogFile::Mode::kWrite);
- m_Oplog.Initialize();
-
- m_OpBlobs.Open(m_OplogStoragePath / "ops.zops"sv, IsCreate ? BasicFile::Mode::kTruncate : BasicFile::Mode::kWrite);
-
- ZEN_ASSERT(IsPow2(m_OpsAlign));
- ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1)));
- }
-
- void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler)
- {
- ZEN_TRACE_CPU("ProjectStore::OplogStorage::ReplayLog");
-
- // This could use memory mapping or do something clever but for now it just reads the file sequentially
-
- ZEN_INFO("replaying log for '{}'", m_OplogStoragePath);
-
- Stopwatch Timer;
-
- uint64_t InvalidEntries = 0;
-
- IoBuffer OpBuffer;
- m_Oplog.Replay(
- [&](const OplogEntry& LogEntry) {
- if (LogEntry.OpCoreSize == 0)
- {
- ++InvalidEntries;
-
- return;
- }
-
- if (OpBuffer.GetSize() < LogEntry.OpCoreSize)
- {
- OpBuffer = IoBuffer(LogEntry.OpCoreSize);
- }
-
- const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
-
- m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
-
- // Verify checksum, ignore op data if incorrect
- const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), LogEntry.OpCoreSize) & 0xffffFFFF);
-
- if (OpCoreHash != LogEntry.OpCoreHash)
- {
- ZEN_WARN("skipping oplog entry with bad checksum!");
- return;
- }
-
- CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), LogEntry.OpCoreSize));
-
- m_NextOpsOffset =
- Max(m_NextOpsOffset.load(std::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
- m_MaxLsn = Max(m_MaxLsn.load(std::memory_order_relaxed), LogEntry.OpLsn);
-
- Handler(Op, LogEntry);
- },
- 0);
-
- if (InvalidEntries)
- {
- ZEN_WARN("ignored {} zero-sized oplog entries", InvalidEntries);
- }
-
- ZEN_INFO("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- m_MaxLsn,
- m_NextOpsOffset);
- }
-
- void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler)
- {
- for (const OplogEntryAddress& Entry : Entries)
- {
- CbObject Op = GetOp(Entry);
- Handler(Op);
- }
- }
-
- CbObject GetOp(const OplogEntryAddress& Entry)
- {
- IoBuffer OpBuffer(Entry.Size);
-
- const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
- m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
-
- return CbObject(SharedBuffer(std::move(OpBuffer)));
- }
-
- OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash)
- {
- ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp");
-
- using namespace std::literals;
-
- uint64_t WriteSize = Buffer.GetSize();
-
- RwLock::ExclusiveLockScope Lock(m_RwLock);
- const uint64_t WriteOffset = m_NextOpsOffset;
- const uint32_t OpLsn = ++m_MaxLsn;
- m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
- Lock.ReleaseNow();
-
- ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign));
-
- OplogEntry Entry = {.OpLsn = OpLsn,
- .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign),
- .OpCoreSize = uint32_t(Buffer.GetSize()),
- .OpCoreHash = OpCoreHash,
- .OpKeyHash = KeyHash};
-
- m_Oplog.Append(Entry);
- m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset);
-
- return Entry;
- }
-
- void Flush()
- {
- m_Oplog.Flush();
- m_OpBlobs.Flush();
- }
-
- spdlog::logger& Log() { return m_OwnerOplog->Log(); }
-
-private:
- ProjectStore::Oplog* m_OwnerOplog;
- std::filesystem::path m_OplogStoragePath;
- mutable RwLock m_RwLock;
- TCasLogFile<OplogEntry> m_Oplog;
- BasicFile m_OpBlobs;
- std::atomic<uint64_t> m_NextOpsOffset{0};
- uint64_t m_OpsAlign = 32;
- std::atomic<uint32_t> m_MaxLsn{0};
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-ProjectStore::Oplog::Oplog(std::string_view Id,
- Project* Project,
- CidStore& Store,
- std::filesystem::path BasePath,
- const std::filesystem::path& MarkerPath)
-: m_OuterProject(Project)
-, m_CidStore(Store)
-, m_BasePath(BasePath)
-, m_MarkerPath(MarkerPath)
-, m_OplogId(Id)
-{
- using namespace std::literals;
-
- m_Storage = new OplogStorage(this, m_BasePath);
- const bool StoreExists = m_Storage->Exists();
- m_Storage->Open(/* IsCreate */ !StoreExists);
-
- m_TempPath = m_BasePath / "temp"sv;
-
- CleanDirectory(m_TempPath);
-}
-
-ProjectStore::Oplog::~Oplog()
-{
- if (m_Storage)
- {
- Flush();
- }
-}
-
-void
-ProjectStore::Oplog::Flush()
-{
- ZEN_ASSERT(m_Storage);
- m_Storage->Flush();
-}
-
-void
-ProjectStore::Oplog::Scrub(ScrubContext& Ctx) const
-{
- ZEN_UNUSED(Ctx);
-}
-
-void
-ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
-{
- RwLock::SharedLockScope _(m_OplogLock);
-
- std::vector<IoHash> Hashes;
- Hashes.reserve(Max(m_ChunkMap.size(), m_MetaMap.size()));
-
- for (const auto& Kv : m_ChunkMap)
- {
- Hashes.push_back(Kv.second);
- }
-
- GcCtx.AddRetainedCids(Hashes);
-
- Hashes.clear();
-
- for (const auto& Kv : m_MetaMap)
- {
- Hashes.push_back(Kv.second);
- }
-
- GcCtx.AddRetainedCids(Hashes);
-}
-
-uint64_t
-ProjectStore::Oplog::TotalSize() const
-{
- RwLock::SharedLockScope _(m_OplogLock);
- if (m_Storage)
- {
- return m_Storage->OpBlobsSize();
- }
- return 0;
-}
-
-bool
-ProjectStore::Oplog::IsExpired() const
-{
- if (m_MarkerPath.empty())
- {
- return false;
- }
- return !std::filesystem::exists(m_MarkerPath);
-}
-
-std::filesystem::path
-ProjectStore::Oplog::PrepareForDelete(bool MoveFolder)
-{
- RwLock::ExclusiveLockScope _(m_OplogLock);
- m_ChunkMap.clear();
- m_MetaMap.clear();
- m_FileMap.clear();
- m_OpAddressMap.clear();
- m_LatestOpMap.clear();
- m_Storage = {};
- if (!MoveFolder)
- {
- return {};
- }
- std::filesystem::path MovedDir;
- if (PrepareDirectoryDelete(m_BasePath, MovedDir))
- {
- return MovedDir;
- }
- return {};
-}
-
-bool
-ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath)
-{
- using namespace std::literals;
-
- std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv;
- return std::filesystem::is_regular_file(StateFilePath);
-}
-
-void
-ProjectStore::Oplog::Read()
-{
- using namespace std::literals;
-
- std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
- if (std::filesystem::is_regular_file(StateFilePath))
- {
- ZEN_INFO("reading config for oplog '{}' in project '{}' from {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
-
- BasicFile Blob;
- Blob.Open(StateFilePath, BasicFile::Mode::kRead);
-
- IoBuffer Obj = Blob.ReadAll();
- CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
-
- if (ValidationError != CbValidateError::None)
- {
- ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), StateFilePath);
- return;
- }
-
- CbObject Cfg = LoadCompactBinaryObject(Obj);
-
- m_MarkerPath = Cfg["gcpath"sv].AsString();
- }
- else
- {
- ZEN_INFO("config for oplog '{}' in project '{}' not found at {}. Assuming legacy store",
- m_OplogId,
- m_OuterProject->Identifier,
- StateFilePath);
- }
- ReplayLog();
-}
-
-void
-ProjectStore::Oplog::Write()
-{
- using namespace std::literals;
-
- BinaryWriter Mem;
-
- CbObjectWriter Cfg;
-
- Cfg << "gcpath"sv << PathToUtf8(m_MarkerPath);
-
- Cfg.Save(Mem);
-
- std::filesystem::path StateFilePath = m_BasePath / "oplog.zcb"sv;
-
- ZEN_INFO("persisting config for oplog '{}' in project '{}' to {}", m_OplogId, m_OuterProject->Identifier, StateFilePath);
-
- BasicFile Blob;
- Blob.Open(StateFilePath, BasicFile::Mode::kTruncate);
- Blob.Write(Mem.Data(), Mem.Size(), 0);
- Blob.Flush();
-}
-
-void
-ProjectStore::Oplog::ReplayLog()
-{
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- if (!m_Storage)
- {
- return;
- }
- m_Storage->ReplayLog(
- [&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry, kUpdateReplay); });
-}
-
-IoBuffer
-ProjectStore::Oplog::FindChunk(Oid ChunkId)
-{
- RwLock::SharedLockScope OplogLock(m_OplogLock);
- if (!m_Storage)
- {
- return IoBuffer{};
- }
-
- if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end())
- {
- IoHash ChunkHash = ChunkIt->second;
- OplogLock.ReleaseNow();
-
- IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
- Chunk.SetContentType(ZenContentType::kCompressedBinary);
-
- return Chunk;
- }
-
- if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
- {
- std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath;
-
- OplogLock.ReleaseNow();
-
- IoBuffer FileChunk = IoBufferBuilder::MakeFromFile(FilePath);
- FileChunk.SetContentType(ZenContentType::kBinary);
-
- return FileChunk;
- }
-
- if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
- {
- IoHash ChunkHash = MetaIt->second;
- OplogLock.ReleaseNow();
-
- IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkHash);
- Chunk.SetContentType(ZenContentType::kCompressedBinary);
-
- return Chunk;
- }
-
- return {};
-}
-
-void
-ProjectStore::Oplog::IterateFileMap(
- std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn)
-{
- RwLock::SharedLockScope _(m_OplogLock);
- if (!m_Storage)
- {
- return;
- }
-
- for (const auto& Kv : m_FileMap)
- {
- Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath);
- }
-}
-
-void
-ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
-{
- RwLock::SharedLockScope _(m_OplogLock);
- if (!m_Storage)
- {
- return;
- }
-
- std::vector<OplogEntryAddress> Entries;
- Entries.reserve(m_LatestOpMap.size());
-
- for (const auto& Kv : m_LatestOpMap)
- {
- const auto AddressEntry = m_OpAddressMap.find(Kv.second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
-
- Entries.push_back(AddressEntry->second);
- }
-
- std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) {
- return Lhs.Offset < Rhs.Offset;
- });
-
- m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); });
-}
-
-void
-ProjectStore::Oplog::IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Handler)
-{
- RwLock::SharedLockScope _(m_OplogLock);
- if (!m_Storage)
- {
- return;
- }
-
- std::vector<size_t> EntryIndexes;
- std::vector<OplogEntryAddress> Entries;
- std::vector<Oid> Keys;
- std::vector<int> LSNs;
- Entries.reserve(m_LatestOpMap.size());
- EntryIndexes.reserve(m_LatestOpMap.size());
- Keys.reserve(m_LatestOpMap.size());
- LSNs.reserve(m_LatestOpMap.size());
-
- for (const auto& Kv : m_LatestOpMap)
- {
- const auto AddressEntry = m_OpAddressMap.find(Kv.second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
-
- Entries.push_back(AddressEntry->second);
- Keys.push_back(Kv.first);
- LSNs.push_back(Kv.second);
- EntryIndexes.push_back(EntryIndexes.size());
- }
-
- std::sort(EntryIndexes.begin(), EntryIndexes.end(), [&Entries](const size_t& Lhs, const size_t& Rhs) {
- const OplogEntryAddress& LhsEntry = Entries[Lhs];
- const OplogEntryAddress& RhsEntry = Entries[Rhs];
- return LhsEntry.Offset < RhsEntry.Offset;
- });
- std::vector<OplogEntryAddress> SortedEntries;
- SortedEntries.reserve(EntryIndexes.size());
- for (size_t Index : EntryIndexes)
- {
- SortedEntries.push_back(Entries[Index]);
- }
-
- size_t EntryIndex = 0;
- m_Storage->ReplayLog(SortedEntries, [&](CbObject Op) {
- Handler(LSNs[EntryIndex], Keys[EntryIndex], Op);
- EntryIndex++;
- });
-}
-
-int
-ProjectStore::Oplog::GetOpIndexByKey(const Oid& Key)
-{
- RwLock::SharedLockScope _(m_OplogLock);
- if (!m_Storage)
- {
- return {};
- }
- if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
- {
- return LatestOp->second;
- }
- return -1;
-}
-
-std::optional<CbObject>
-ProjectStore::Oplog::GetOpByKey(const Oid& Key)
-{
- RwLock::SharedLockScope _(m_OplogLock);
- if (!m_Storage)
- {
- return {};
- }
-
- if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
- {
- const auto AddressEntry = m_OpAddressMap.find(LatestOp->second);
- ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
-
- return m_Storage->GetOp(AddressEntry->second);
- }
-
- return {};
-}
-
-std::optional<CbObject>
-ProjectStore::Oplog::GetOpByIndex(int Index)
-{
- RwLock::SharedLockScope _(m_OplogLock);
- if (!m_Storage)
- {
- return {};
- }
-
- if (const auto AddressEntryIt = m_OpAddressMap.find(Index); AddressEntryIt != m_OpAddressMap.end())
- {
- return m_Storage->GetOp(AddressEntryIt->second);
- }
-
- return {};
-}
-
-void
-ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
- Oid FileId,
- IoHash Hash,
- std::string_view ServerPath,
- std::string_view ClientPath)
-{
- if (Hash != IoHash::Zero)
- {
- m_ChunkMap.insert_or_assign(FileId, Hash);
- }
-
- FileMapEntry Entry;
- Entry.ServerPath = ServerPath;
- Entry.ClientPath = ClientPath;
-
- m_FileMap[FileId] = std::move(Entry);
-
- if (Hash != IoHash::Zero)
- {
- m_ChunkMap.insert_or_assign(FileId, Hash);
- }
-}
-
-void
-ProjectStore::Oplog::AddChunkMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash)
-{
- m_ChunkMap.insert_or_assign(ChunkId, Hash);
-}
-
-void
-ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid ChunkId, IoHash Hash)
-{
- m_MetaMap.insert_or_assign(ChunkId, Hash);
-}
-
-ProjectStore::Oplog::OplogEntryMapping
-ProjectStore::Oplog::GetMapping(CbObject Core)
-{
- using namespace std::literals;
-
- OplogEntryMapping Result;
-
- // Update chunk id maps
- CbObjectView PackageObj = Core["package"sv].AsObjectView();
- CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView();
- CbArrayView PackageDataArray = Core["packagedata"sv].AsArrayView();
- Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num() + PackageDataArray.Num());
-
- if (PackageObj)
- {
- Oid Id = PackageObj["id"sv].AsObjectId();
- IoHash Hash = PackageObj["data"sv].AsBinaryAttachment();
- Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("package data {} -> {}", Id, Hash);
- }
-
- for (CbFieldView& Entry : PackageDataArray)
- {
- CbObjectView PackageDataObj = Entry.AsObjectView();
- Oid Id = PackageDataObj["id"sv].AsObjectId();
- IoHash Hash = PackageDataObj["data"sv].AsBinaryAttachment();
- Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("package {} -> {}", Id, Hash);
- }
-
- for (CbFieldView& Entry : BulkDataArray)
- {
- CbObjectView BulkObj = Entry.AsObjectView();
- Oid Id = BulkObj["id"sv].AsObjectId();
- IoHash Hash = BulkObj["data"sv].AsBinaryAttachment();
- Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- ZEN_DEBUG("bulkdata {} -> {}", Id, Hash);
- }
-
- CbArrayView FilesArray = Core["files"sv].AsArrayView();
- Result.Files.reserve(FilesArray.Num());
- for (CbFieldView& Entry : FilesArray)
- {
- CbObjectView FileObj = Entry.AsObjectView();
-
- std::string_view ServerPath = FileObj["serverpath"sv].AsString();
- std::string_view ClientPath = FileObj["clientpath"sv].AsString();
- if (ServerPath.empty() || ClientPath.empty())
- {
- ZEN_WARN("invalid file");
- continue;
- }
-
- Oid Id = FileObj["id"sv].AsObjectId();
- IoHash Hash = FileObj["data"sv].AsBinaryAttachment();
- Result.Files.emplace_back(
- OplogEntryMapping::FileMapping{OplogEntryMapping::Mapping{Id, Hash}, std::string(ServerPath), std::string(ClientPath)});
- ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath);
- }
-
- CbArrayView MetaArray = Core["meta"sv].AsArrayView();
- Result.Meta.reserve(MetaArray.Num());
- for (CbFieldView& Entry : MetaArray)
- {
- CbObjectView MetaObj = Entry.AsObjectView();
- Oid Id = MetaObj["id"sv].AsObjectId();
- IoHash Hash = MetaObj["data"sv].AsBinaryAttachment();
- Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
- auto NameString = MetaObj["name"sv].AsString();
- ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash);
- }
-
- return Result;
-}
-
-uint32_t
-ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
- const OplogEntryMapping& OpMapping,
- const OplogEntry& OpEntry,
- UpdateType TypeOfUpdate)
-{
- ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry");
-
- ZEN_UNUSED(TypeOfUpdate);
-
- // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
- // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
-
- using namespace std::literals;
-
- // Update chunk id maps
- for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks)
- {
- AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash);
- }
-
- for (const OplogEntryMapping::FileMapping& File : OpMapping.Files)
- {
- AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath);
- }
-
- for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta)
- {
- AddMetaMapping(OplogLock, Meta.Id, Meta.Hash);
- }
-
- m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
- m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn;
-
- return OpEntry.OpLsn;
-}
-
-uint32_t
-ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
-{
- ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
-
- const CbObject& Core = OpPackage.GetObject();
- const uint32_t EntryId = AppendNewOplogEntry(Core);
- if (EntryId == 0xffffffffu)
- {
- // The oplog has been deleted so just drop this
- return EntryId;
- }
-
- // Persist attachments after oplog entry so GC won't find attachments without references
-
- uint64_t AttachmentBytes = 0;
- uint64_t NewAttachmentBytes = 0;
-
- auto Attachments = OpPackage.GetAttachments();
-
- for (const auto& Attach : Attachments)
- {
- ZEN_ASSERT(Attach.IsCompressedBinary());
-
- CompressedBuffer AttachmentData = Attach.AsCompressedBinary();
- const uint64_t AttachmentSize = AttachmentData.DecodeRawSize();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(AttachmentData.GetCompressed().Flatten().AsIoBuffer(), Attach.GetHash());
-
- if (InsertResult.New)
- {
- NewAttachmentBytes += AttachmentSize;
- }
- AttachmentBytes += AttachmentSize;
- }
-
- ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
-
- return EntryId;
-}
-
-uint32_t
-ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
-{
- ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
-
- using namespace std::literals;
-
- OplogEntryMapping Mapping = GetMapping(Core);
-
- SharedBuffer Buffer = Core.GetBuffer();
- const uint64_t WriteSize = Buffer.GetSize();
- const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
-
- ZEN_ASSERT(WriteSize != 0);
-
- XXH3_128Stream KeyHasher;
- Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash = KeyHasher.GetHash();
-
- RefPtr<OplogStorage> Storage;
- {
- RwLock::SharedLockScope _(m_OplogLock);
- Storage = m_Storage;
- }
- if (!m_Storage)
- {
- return 0xffffffffu;
- }
- const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash);
-
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
- const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry, kUpdateNewEntry);
-
- return EntryId;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-ProjectStore::Project::Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath)
-: m_ProjectStore(PrjStore)
-, m_CidStore(Store)
-, m_OplogStoragePath(BasePath)
-{
-}
-
-ProjectStore::Project::~Project()
-{
-}
-
-bool
-ProjectStore::Project::Exists(std::filesystem::path BasePath)
-{
- return std::filesystem::exists(BasePath / "Project.zcb");
-}
-
-void
-ProjectStore::Project::Read()
-{
- using namespace std::literals;
-
- std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
-
- ZEN_INFO("reading config for project '{}' from {}", Identifier, ProjectStateFilePath);
-
- BasicFile Blob;
- Blob.Open(ProjectStateFilePath, BasicFile::Mode::kRead);
-
- IoBuffer Obj = Blob.ReadAll();
- CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
-
- if (ValidationError == CbValidateError::None)
- {
- CbObject Cfg = LoadCompactBinaryObject(Obj);
-
- Identifier = Cfg["id"sv].AsString();
- RootDir = Cfg["root"sv].AsString();
- ProjectRootDir = Cfg["project"sv].AsString();
- EngineRootDir = Cfg["engine"sv].AsString();
- ProjectFilePath = Cfg["projectfile"sv].AsString();
- }
- else
- {
- ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath);
- }
-}
-
-void
-ProjectStore::Project::Write()
-{
- using namespace std::literals;
-
- BinaryWriter Mem;
-
- CbObjectWriter Cfg;
- Cfg << "id"sv << Identifier;
- Cfg << "root"sv << PathToUtf8(RootDir);
- Cfg << "project"sv << ProjectRootDir;
- Cfg << "engine"sv << EngineRootDir;
- Cfg << "projectfile"sv << ProjectFilePath;
-
- Cfg.Save(Mem);
-
- CreateDirectories(m_OplogStoragePath);
-
- std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb"sv;
-
- ZEN_INFO("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath);
-
- BasicFile Blob;
- Blob.Open(ProjectStateFilePath, BasicFile::Mode::kTruncate);
- Blob.Write(Mem.Data(), Mem.Size(), 0);
- Blob.Flush();
-}
-
-spdlog::logger&
-ProjectStore::Project::Log()
-{
- return m_ProjectStore->Log();
-}
-
-std::filesystem::path
-ProjectStore::Project::BasePathForOplog(std::string_view OplogId)
-{
- return m_OplogStoragePath / OplogId;
-}
-
-ProjectStore::Oplog*
-ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem::path& MarkerPath)
-{
- RwLock::ExclusiveLockScope _(m_ProjectLock);
-
- std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
-
- try
- {
- Oplog* Log = m_Oplogs
- .try_emplace(std::string{OplogId},
- std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, MarkerPath))
- .first->second.get();
-
- Log->Write();
- return Log;
- }
- catch (std::exception&)
- {
- // In case of failure we need to ensure there's no half constructed entry around
- //
- // (This is probably already ensured by the try_emplace implementation?)
-
- m_Oplogs.erase(std::string{OplogId});
-
- return nullptr;
- }
-}
-
-ProjectStore::Oplog*
-ProjectStore::Project::OpenOplog(std::string_view OplogId)
-{
- {
- RwLock::SharedLockScope _(m_ProjectLock);
-
- auto OplogIt = m_Oplogs.find(std::string(OplogId));
-
- if (OplogIt != m_Oplogs.end())
- {
- return OplogIt->second.get();
- }
- }
-
- RwLock::ExclusiveLockScope _(m_ProjectLock);
-
- std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
-
- if (Oplog::ExistsAt(OplogBasePath))
- {
- // Do open of existing oplog
-
- try
- {
- Oplog* Log =
- m_Oplogs
- .try_emplace(std::string{OplogId},
- std::make_unique<ProjectStore::Oplog>(OplogId, this, m_CidStore, OplogBasePath, std::filesystem::path{}))
- .first->second.get();
- Log->Read();
-
- return Log;
- }
- catch (std::exception& ex)
- {
- ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what());
-
- m_Oplogs.erase(std::string{OplogId});
- }
- }
-
- return nullptr;
-}
-
-void
-ProjectStore::Project::DeleteOplog(std::string_view OplogId)
-{
- std::filesystem::path DeletePath;
- {
- RwLock::ExclusiveLockScope _(m_ProjectLock);
-
- auto OplogIt = m_Oplogs.find(std::string(OplogId));
-
- if (OplogIt != m_Oplogs.end())
- {
- std::unique_ptr<Oplog>& Oplog = OplogIt->second;
- DeletePath = Oplog->PrepareForDelete(true);
- m_DeletedOplogs.emplace_back(std::move(Oplog));
- m_Oplogs.erase(OplogIt);
- }
- }
-
- // Erase content on disk
- if (!DeletePath.empty())
- {
- OplogStorage::Delete(DeletePath);
- }
-}
-
-std::vector<std::string>
-ProjectStore::Project::ScanForOplogs() const
-{
- DirectoryContent DirContent;
- GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent);
- std::vector<std::string> Oplogs;
- Oplogs.reserve(DirContent.Directories.size());
- for (const std::filesystem::path& DirPath : DirContent.Directories)
- {
- Oplogs.push_back(DirPath.filename().string());
- }
- return Oplogs;
-}
-
-void
-ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const
-{
- RwLock::SharedLockScope _(m_ProjectLock);
-
- for (auto& Kv : m_Oplogs)
- {
- Fn(*Kv.second);
- }
-}
-
-void
-ProjectStore::Project::IterateOplogs(std::function<void(Oplog&)>&& Fn)
-{
- RwLock::SharedLockScope _(m_ProjectLock);
-
- for (auto& Kv : m_Oplogs)
- {
- Fn(*Kv.second);
- }
-}
-
-void
-ProjectStore::Project::Flush()
-{
- // We only need to flush oplogs that we have already loaded
- IterateOplogs([&](Oplog& Ops) { Ops.Flush(); });
-}
-
-void
-ProjectStore::Project::Scrub(ScrubContext& Ctx)
-{
- // Scrubbing needs to check all existing oplogs
- std::vector<std::string> OpLogs = ScanForOplogs();
- for (const std::string& OpLogId : OpLogs)
- {
- OpenOplog(OpLogId);
- }
- IterateOplogs([&](const Oplog& Ops) {
- if (!Ops.IsExpired())
- {
- Ops.Scrub(Ctx);
- }
- });
-}
-
-void
-ProjectStore::Project::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("ProjectStore::Project::GatherReferences");
-
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- ZEN_DEBUG("gathered references from project store project {} in {}", Identifier, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- // GatherReferences needs to check all existing oplogs
- std::vector<std::string> OpLogs = ScanForOplogs();
- for (const std::string& OpLogId : OpLogs)
- {
- OpenOplog(OpLogId);
- }
- IterateOplogs([&](Oplog& Ops) {
- if (!Ops.IsExpired())
- {
- Ops.GatherReferences(GcCtx);
- }
- });
-}
-
-uint64_t
-ProjectStore::Project::TotalSize() const
-{
- uint64_t Result = 0;
- {
- RwLock::SharedLockScope _(m_ProjectLock);
- for (const auto& It : m_Oplogs)
- {
- Result += It.second->TotalSize();
- }
- }
- return Result;
-}
-
-bool
-ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath)
-{
- RwLock::ExclusiveLockScope _(m_ProjectLock);
-
- for (auto& It : m_Oplogs)
- {
- // We don't care about the moved folder
- It.second->PrepareForDelete(false);
- m_DeletedOplogs.emplace_back(std::move(It.second));
- }
-
- m_Oplogs.clear();
-
- bool Success = PrepareDirectoryDelete(m_OplogStoragePath, OutDeletePath);
- if (!Success)
- {
- return false;
- }
- m_OplogStoragePath.clear();
- return true;
-}
-
-bool
-ProjectStore::Project::IsExpired() const
-{
- if (ProjectFilePath.empty())
- {
- return false;
- }
- return !std::filesystem::exists(ProjectFilePath);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
-: GcStorage(Gc)
-, GcContributor(Gc)
-, m_Log(logging::Get("project"))
-, m_CidStore(Store)
-, m_ProjectBasePath(BasePath)
-{
- ZEN_INFO("initializing project store at '{}'", BasePath);
- // m_Log.set_level(spdlog::level::debug);
-}
-
-ProjectStore::~ProjectStore()
-{
- ZEN_INFO("closing project store ('{}')", m_ProjectBasePath);
-}
-
-std::filesystem::path
-ProjectStore::BasePathForProject(std::string_view ProjectId)
-{
- return m_ProjectBasePath / ProjectId;
-}
-
-void
-ProjectStore::DiscoverProjects()
-{
- if (!std::filesystem::exists(m_ProjectBasePath))
- {
- return;
- }
-
- DirectoryContent DirContent;
- GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent);
-
- for (const std::filesystem::path& DirPath : DirContent.Directories)
- {
- std::string DirName = PathToUtf8(DirPath.filename());
- OpenProject(DirName);
- }
-}
-
-void
-ProjectStore::IterateProjects(std::function<void(Project& Prj)>&& Fn)
-{
- RwLock::SharedLockScope _(m_ProjectsLock);
-
- for (auto& Kv : m_Projects)
- {
- Fn(*Kv.second.Get());
- }
-}
-
-void
-ProjectStore::Flush()
-{
- std::vector<Ref<Project>> Projects;
- {
- RwLock::SharedLockScope _(m_ProjectsLock);
- Projects.reserve(m_Projects.size());
-
- for (auto& Kv : m_Projects)
- {
- Projects.push_back(Kv.second);
- }
- }
- for (const Ref<Project>& Project : Projects)
- {
- Project->Flush();
- }
-}
-
-void
-ProjectStore::Scrub(ScrubContext& Ctx)
-{
- DiscoverProjects();
-
- std::vector<Ref<Project>> Projects;
- {
- RwLock::SharedLockScope _(m_ProjectsLock);
- Projects.reserve(m_Projects.size());
-
- for (auto& Kv : m_Projects)
- {
- if (Kv.second->IsExpired())
- {
- continue;
- }
- Projects.push_back(Kv.second);
- }
- }
- for (const Ref<Project>& Project : Projects)
- {
- Project->Scrub(Ctx);
- }
-}
-
-void
-ProjectStore::GatherReferences(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("ProjectStore::GatherReferences");
-
- size_t ProjectCount = 0;
- size_t ExpiredProjectCount = 0;
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- ZEN_DEBUG("gathered references from '{}' in {}, found {} active projects and {} expired projects",
- m_ProjectBasePath.string(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- ProjectCount,
- ExpiredProjectCount);
- });
-
- DiscoverProjects();
-
- std::vector<Ref<Project>> Projects;
- {
- RwLock::SharedLockScope _(m_ProjectsLock);
- Projects.reserve(m_Projects.size());
-
- for (auto& Kv : m_Projects)
- {
- if (Kv.second->IsExpired())
- {
- ExpiredProjectCount++;
- continue;
- }
- Projects.push_back(Kv.second);
- }
- }
- ProjectCount = Projects.size();
- for (const Ref<Project>& Project : Projects)
- {
- Project->GatherReferences(GcCtx);
- }
-}
-
-void
-ProjectStore::CollectGarbage(GcContext& GcCtx)
-{
- ZEN_TRACE_CPU("ProjectStore::CollectGarbage");
-
- size_t ProjectCount = 0;
- size_t ExpiredProjectCount = 0;
-
- Stopwatch Timer;
- const auto Guard = MakeGuard([&] {
- ZEN_DEBUG("garbage collect from '{}' DONE after {}, found {} active projects and {} expired projects",
- m_ProjectBasePath.string(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- ProjectCount,
- ExpiredProjectCount);
- });
- std::vector<Ref<Project>> ExpiredProjects;
- std::vector<Ref<Project>> Projects;
-
- {
- RwLock::SharedLockScope _(m_ProjectsLock);
- for (auto& Kv : m_Projects)
- {
- if (Kv.second->IsExpired())
- {
- ExpiredProjects.push_back(Kv.second);
- ExpiredProjectCount++;
- continue;
- }
- Projects.push_back(Kv.second);
- ProjectCount++;
- }
- }
-
- if (!GcCtx.IsDeletionMode())
- {
- ZEN_DEBUG("garbage collect DISABLED, for '{}' ", m_ProjectBasePath.string());
- return;
- }
-
- for (const Ref<Project>& Project : Projects)
- {
- std::vector<std::string> ExpiredOplogs;
- {
- RwLock::ExclusiveLockScope _(m_ProjectsLock);
- Project->IterateOplogs([&ExpiredOplogs](ProjectStore::Oplog& Oplog) {
- if (Oplog.IsExpired())
- {
- ExpiredOplogs.push_back(Oplog.OplogId());
- }
- });
- }
- for (const std::string& OplogId : ExpiredOplogs)
- {
- ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected oplog '{}' in project '{}'. Removing storage on disk",
- OplogId,
- Project->Identifier);
- Project->DeleteOplog(OplogId);
- }
- }
-
- if (ExpiredProjects.empty())
- {
- ZEN_DEBUG("garbage collect for '{}', no expired projects found", m_ProjectBasePath.string());
- return;
- }
-
- for (const Ref<Project>& Project : ExpiredProjects)
- {
- std::filesystem::path PathToRemove;
- std::string ProjectId;
- {
- RwLock::ExclusiveLockScope _(m_ProjectsLock);
- if (!Project->IsExpired())
- {
- ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project no longer expired.", ProjectId);
- continue;
- }
- bool Success = Project->PrepareForDelete(PathToRemove);
- if (!Success)
- {
- ZEN_DEBUG("ProjectStore::CollectGarbage skipped garbage collect of project '{}'. Project folder is locked.", ProjectId);
- continue;
- }
- m_Projects.erase(Project->Identifier);
- ProjectId = Project->Identifier;
- }
-
- ZEN_DEBUG("ProjectStore::CollectGarbage garbage collected project '{}'. Removing storage on disk", ProjectId);
- if (PathToRemove.empty())
- {
- continue;
- }
-
- DeleteDirectories(PathToRemove);
- }
-}
-
-GcStorageSize
-ProjectStore::StorageSize() const
-{
- GcStorageSize Result;
- {
- RwLock::SharedLockScope _(m_ProjectsLock);
- for (auto& Kv : m_Projects)
- {
- const Ref<Project>& Project = Kv.second;
- Result.DiskSize += Project->TotalSize();
- }
- }
- return Result;
-}
-
-Ref<ProjectStore::Project>
-ProjectStore::OpenProject(std::string_view ProjectId)
-{
- {
- RwLock::SharedLockScope _(m_ProjectsLock);
-
- auto ProjIt = m_Projects.find(std::string{ProjectId});
-
- if (ProjIt != m_Projects.end())
- {
- return ProjIt->second;
- }
- }
-
- RwLock::ExclusiveLockScope _(m_ProjectsLock);
-
- std::filesystem::path BasePath = BasePathForProject(ProjectId);
-
- if (Project::Exists(BasePath))
- {
- try
- {
- ZEN_INFO("opening project {} @ {}", ProjectId, BasePath);
-
- Ref<Project>& Prj =
- m_Projects
- .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
- .first->second;
- Prj->Identifier = ProjectId;
- Prj->Read();
- return Prj;
- }
- catch (std::exception& e)
- {
- ZEN_WARN("failed to open {} @ {} ({})", ProjectId, BasePath, e.what());
- m_Projects.erase(std::string{ProjectId});
- }
- }
-
- return {};
-}
-
-Ref<ProjectStore::Project>
-ProjectStore::NewProject(std::filesystem::path BasePath,
- std::string_view ProjectId,
- std::string_view RootDir,
- std::string_view EngineRootDir,
- std::string_view ProjectRootDir,
- std::string_view ProjectFilePath)
-{
- RwLock::ExclusiveLockScope _(m_ProjectsLock);
-
- Ref<Project>& Prj =
- m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
- .first->second;
- Prj->Identifier = ProjectId;
- Prj->RootDir = RootDir;
- Prj->EngineRootDir = EngineRootDir;
- Prj->ProjectRootDir = ProjectRootDir;
- Prj->ProjectFilePath = ProjectFilePath;
- Prj->Write();
-
- return Prj;
-}
-
-bool
-ProjectStore::DeleteProject(std::string_view ProjectId)
-{
- ZEN_INFO("deleting project {}", ProjectId);
-
- RwLock::ExclusiveLockScope ProjectsLock(m_ProjectsLock);
-
- auto ProjIt = m_Projects.find(std::string{ProjectId});
-
- if (ProjIt == m_Projects.end())
- {
- return true;
- }
-
- std::filesystem::path DeletePath;
- bool Success = ProjIt->second->PrepareForDelete(DeletePath);
-
- if (!Success)
- {
- return false;
- }
- m_Projects.erase(ProjIt);
- ProjectsLock.ReleaseNow();
-
- if (!DeletePath.empty())
- {
- DeleteDirectories(DeletePath);
- }
- return true;
-}
-
-bool
-ProjectStore::Exists(std::string_view ProjectId)
-{
- return Project::Exists(BasePathForProject(ProjectId));
-}
-
-CbArray
-ProjectStore::GetProjectsList()
-{
- using namespace std::literals;
-
- DiscoverProjects();
-
- CbWriter Response;
- Response.BeginArray();
-
- IterateProjects([&Response](ProjectStore::Project& Prj) {
- Response.BeginObject();
- Response << "Id"sv << Prj.Identifier;
- Response << "RootDir"sv << Prj.RootDir.string();
- Response << "ProjectRootDir"sv << Prj.ProjectRootDir;
- Response << "EngineRootDir"sv << Prj.EngineRootDir;
- Response << "ProjectFilePath"sv << Prj.ProjectFilePath;
- Response.EndObject();
- });
- Response.EndArray();
- return Response.Save().AsArray();
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload)
-{
- using namespace std::literals;
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Project files request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Project files for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
-
- CbObjectWriter Response;
- Response.BeginArray("files"sv);
-
- FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
- Response.BeginObject();
- Response << "id"sv << Id;
- Response << "clientpath"sv << ClientPath;
- if (!FilterClient)
- {
- Response << "serverpath"sv << ServerPath;
- }
- Response.EndObject();
- });
-
- Response.EndArray();
- OutPayload = Response.Save();
- return {HttpResponseCode::OK, {}};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunkInfo(const std::string_view ProjectId,
- const std::string_view OplogId,
- const std::string_view ChunkId,
- CbObject& OutPayload)
-{
- using namespace std::literals;
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
- if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
- {
- return {HttpResponseCode::BadRequest,
- fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)};
- }
-
- const Oid Obj = Oid::FromHexString(ChunkId);
-
- IoBuffer Chunk = FoundLog->FindChunk(Obj);
- if (!Chunk)
- {
- return {HttpResponseCode::NotFound, {}};
- }
-
- uint64_t ChunkSize = Chunk.GetSize();
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- bool IsCompressed = CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize);
- ZEN_ASSERT(IsCompressed);
- ChunkSize = RawSize;
- }
-
- CbObjectWriter Response;
- Response << "size"sv << ChunkSize;
- OutPayload = Response.Save();
- return {HttpResponseCode::OK, {}};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunkRange(const std::string_view ProjectId,
- const std::string_view OplogId,
- const std::string_view ChunkId,
- uint64_t Offset,
- uint64_t Size,
- ZenContentType AcceptType,
- IoBuffer& OutChunk)
-{
- bool IsOffset = Offset != 0 || Size != ~(0ull);
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
-
- if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
- {
- return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)};
- }
-
- const Oid Obj = Oid::FromHexString(ChunkId);
-
- IoBuffer Chunk = FoundLog->FindChunk(Obj);
- if (!Chunk)
- {
- return {HttpResponseCode::NotFound, {}};
- }
-
- OutChunk = Chunk;
- HttpContentType ContentType = Chunk.GetContentType();
-
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
- ZEN_ASSERT(!Compressed.IsNull());
-
- if (IsOffset)
- {
- if ((Offset + Size) > RawSize)
- {
- Size = RawSize - Offset;
- }
-
- if (AcceptType == HttpContentType::kBinary)
- {
- OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer();
- OutChunk.SetContentType(HttpContentType::kBinary);
- }
- else
- {
- // Value will be a range of compressed blocks that covers the requested range
- // The client will have to compensate for any offsets that do not land on an even block size multiple
- OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
- OutChunk.SetContentType(HttpContentType::kCompressedBinary);
- }
- }
- else
- {
- if (AcceptType == HttpContentType::kBinary)
- {
- OutChunk = Compressed.Decompress().AsIoBuffer();
- OutChunk.SetContentType(HttpContentType::kBinary);
- }
- else
- {
- OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer();
- OutChunk.SetContentType(HttpContentType::kCompressedBinary);
- }
- }
- }
- else if (IsOffset)
- {
- if ((Offset + Size) > Chunk.GetSize())
- {
- Size = Chunk.GetSize() - Offset;
- }
- OutChunk = IoBuffer(std::move(Chunk), Offset, Size);
- OutChunk.SetContentType(ContentType);
- }
-
- return {HttpResponseCode::OK, {}};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::GetChunk(const std::string_view ProjectId,
- const std::string_view OplogId,
- const std::string_view Cid,
- ZenContentType AcceptType,
- IoBuffer& OutChunk)
-{
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
-
- if (Cid.length() != IoHash::StringLength)
- {
- return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, Cid)};
- }
-
- const IoHash Hash = IoHash::FromHexString(Cid);
- OutChunk = m_CidStore.FindChunkByCid(Hash);
-
- if (!OutChunk)
- {
- return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)};
- }
-
- if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary)
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk));
- OutChunk = Compressed.Decompress().AsIoBuffer();
- OutChunk.SetContentType(ZenContentType::kBinary);
- }
- else
- {
- OutChunk.SetContentType(ZenContentType::kCompressedBinary);
- }
- return {HttpResponseCode::OK, {}};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::PutChunk(const std::string_view ProjectId,
- const std::string_view OplogId,
- const std::string_view Cid,
- ZenContentType ContentType,
- IoBuffer&& Chunk)
-{
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
-
- if (Cid.length() != IoHash::StringLength)
- {
- return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)};
- }
-
- const IoHash Hash = IoHash::FromHexString(Cid);
-
- if (ContentType != HttpContentType::kCompressedBinary)
- {
- return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)};
- }
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
- if (RawHash != Hash)
- {
- return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)};
- }
-
- CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash);
- return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse)
-{
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
-
- if (!Oplog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
-
- CbObject ContainerObject = LoadCompactBinaryObject(Payload);
- if (!ContainerObject)
- {
- return {HttpResponseCode::BadRequest, "Invalid payload format"};
- }
-
- CidStore& ChunkStore = m_CidStore;
- RwLock AttachmentsLock;
- std::unordered_set<IoHash, IoHash::Hasher> Attachments;
-
- auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); };
- auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
- RwLock::ExclusiveLockScope _(AttachmentsLock);
- if (BlockHash != IoHash::Zero)
- {
- Attachments.insert(BlockHash);
- }
- else
- {
- Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
- }
- };
- auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
- RwLock::ExclusiveLockScope _(AttachmentsLock);
- Attachments.insert(RawHash);
- };
-
- RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
-
- if (RemoteResult.ErrorCode)
- {
- return ConvertResult(RemoteResult);
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
- {
- for (const IoHash& Hash : Attachments)
- {
- ZEN_DEBUG("Need attachment {}", Hash);
- Cbo << Hash;
- }
- }
- Cbo.EndArray(); // "need"
-
- OutResponse = Cbo.Save();
- return {HttpResponseCode::OK, {}};
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::ReadOplog(const std::string_view ProjectId,
- const std::string_view OplogId,
- const HttpServerRequest::QueryParams& Params,
- CbObject& OutResponse)
-{
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
-
- if (!Oplog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
-
- size_t MaxBlockSize = 128u * 1024u * 1024u;
- if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false)
- {
- if (auto Value = ParseInt<size_t>(Param))
- {
- MaxBlockSize = Value.value();
- }
- }
- size_t MaxChunkEmbedSize = 1024u * 1024u;
- if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false)
- {
- if (auto Value = ParseInt<size_t>(Param))
- {
- MaxChunkEmbedSize = Value.value();
- }
- }
-
- CidStore& ChunkStore = m_CidStore;
-
- RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
- ChunkStore,
- *Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- false,
- [](CompressedBuffer&&, const IoHash) {},
- [](const IoHash&) {},
- [](const std::unordered_set<IoHash, IoHash::Hasher>) {});
-
- OutResponse = std::move(ContainerResult.ContainerObject);
- return ConvertResult(ContainerResult);
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload)
-{
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)};
- }
-
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
-
- if (!Oplog)
- {
- return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)};
- }
-
- if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer();
- m_CidStore.AddChunk(Compressed, AttachmentRawHash);
- ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize());
- }))
- {
- return {HttpResponseCode::BadRequest, "Invalid chunk in block"};
- }
-
- return {HttpResponseCode::OK, {}};
-}
-
-void
-ProjectStore::Rpc(HttpServerRequest& HttpReq,
- const std::string_view ProjectId,
- const std::string_view OplogId,
- IoBuffer&& Payload,
- AuthMgr& AuthManager)
-{
- using namespace std::literals;
- HttpContentType PayloadContentType = HttpReq.RequestContentType();
- CbPackage Package;
- CbObject Cb;
- switch (PayloadContentType)
- {
- case HttpContentType::kJSON:
- case HttpContentType::kUnknownContentType:
- case HttpContentType::kText:
- {
- std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
- Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
- if (!Cb)
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Content format not supported, expected JSON format");
- }
- }
- break;
- case HttpContentType::kCbObject:
- Cb = LoadCompactBinaryObject(Payload);
- if (!Cb)
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Content format not supported, expected compact binary format");
- }
- break;
- case HttpContentType::kCbPackage:
- Package = ParsePackageMessage(Payload);
- Cb = Package.GetObject();
- if (!Cb)
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Content format not supported, expected package message format");
- }
- break;
- default:
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
- }
-
- Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
- }
-
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
-
- if (!Oplog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
- }
-
- std::string_view Method = Cb["method"sv].AsString();
-
- if (Method == "import")
- {
- std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- else if (Method == "export")
- {
- std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- else if (Method == "getchunks")
- {
- CbPackage ResponsePackage;
- {
- CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("chunks"sv);
- for (CbFieldView FieldView : ChunksArray)
- {
- IoHash RawHash = FieldView.AsHash();
- IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
- if (ChunkBuffer)
- {
- ResponseWriter.AddHash(RawHash);
- ResponsePackage.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash));
- }
- }
- ResponseWriter.EndArray();
- ResponsePackage.SetObject(ResponseWriter.Save());
- }
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else if (Method == "putchunks")
- {
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- for (const CbAttachment& Attachment : Attachments)
- {
- IoHash RawHash = Attachment.GetHash();
- CompressedBuffer Compressed = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
-{
- using namespace std::literals;
-
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
- bool Force = Params["force"sv].AsBool(false);
-
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
-
- if (RemoteStoreResult.first == nullptr)
- {
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
- }
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
-
- ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
- Project.Identifier,
- Oplog.OplogId(),
- StoreInfo.Description,
- NiceBytes(MaxBlockSize),
- NiceBytes(MaxChunkEmbedSize));
-
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
- *RemoteStore,
- Oplog,
- MaxBlockSize,
- MaxChunkEmbedSize,
- StoreInfo.CreateBlocks,
- StoreInfo.UseTempBlockFiles,
- Force);
-
- return ConvertResult(Result);
-}
-
-std::pair<HttpResponseCode, std::string>
-ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
-{
- using namespace std::literals;
-
- size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
- size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
- bool Force = Params["force"sv].AsBool(false);
-
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
-
- if (RemoteStoreResult.first == nullptr)
- {
- return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
- }
- std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
-
- ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
- RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force);
- return ConvertResult(Result);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr)
-: m_Log(logging::Get("project"))
-, m_CidStore(Store)
-, m_ProjectStore(Projects)
-, m_StatsService(StatsService)
-, m_AuthMgr(AuthMgr)
-{
- using namespace std::literals;
-
- m_StatsService.RegisterHandler("prj", *this);
-
- m_Router.AddPattern("project", "([[:alnum:]_.]+)");
- m_Router.AddPattern("log", "([[:alnum:]_.]+)");
- m_Router.AddPattern("op", "([[:digit:]]+?)");
- m_Router.AddPattern("chunk", "([[:xdigit:]]{24})");
- m_Router.AddPattern("hash", "([[:xdigit:]]{40})");
-
- m_Router.RegisterRoute(
- "",
- [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "list",
- [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/batch",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- // Parse Request
-
- IoBuffer Payload = HttpReq.ReadPayload();
- BinaryReader Reader(Payload);
-
- struct RequestHeader
- {
- enum
- {
- kMagic = 0xAAAA'77AC
- };
- uint32_t Magic;
- uint32_t ChunkCount;
- uint32_t Reserved1;
- uint32_t Reserved2;
- };
-
- struct RequestChunkEntry
- {
- Oid ChunkId;
- uint32_t CorrelationId;
- uint64_t Offset;
- uint64_t RequestBytes;
- };
-
- if (Payload.Size() <= sizeof(RequestHeader))
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- RequestHeader RequestHdr;
- Reader.Read(&RequestHdr, sizeof RequestHdr);
-
- if (RequestHdr.Magic != RequestHeader::kMagic)
- {
- HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- std::vector<RequestChunkEntry> RequestedChunks;
- RequestedChunks.resize(RequestHdr.ChunkCount);
- Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
-
- // Make Response
-
- struct ResponseHeader
- {
- uint32_t Magic = 0xbada'b00f;
- uint32_t ChunkCount;
- uint32_t Reserved1 = 0;
- uint32_t Reserved2 = 0;
- };
-
- struct ResponseChunkEntry
- {
- uint32_t CorrelationId;
- uint32_t Flags = 0;
- uint64_t ChunkSize;
- };
-
- std::vector<IoBuffer> OutBlobs;
- OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry));
- for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
- {
- const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
- IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId);
- if (FoundChunk)
- {
- if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
- {
- uint64_t Offset = RequestedChunk.Offset;
- if (Offset > FoundChunk.Size())
- {
- Offset = FoundChunk.Size();
- }
- uint64_t Size = RequestedChunk.RequestBytes;
- if ((Offset + Size) > FoundChunk.Size())
- {
- Size = FoundChunk.Size() - Offset;
- }
- FoundChunk = IoBuffer(FoundChunk, Offset, Size);
- }
- }
- OutBlobs.emplace_back(std::move(FoundChunk));
- }
- uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
- ResponseHeader ResponseHdr;
- ResponseHdr.ChunkCount = RequestHdr.ChunkCount;
- memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr));
- ResponsePtr += sizeof(ResponseHdr);
- for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
- {
- // const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
- const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]);
- ResponseChunkEntry ResponseChunk;
- ResponseChunk.CorrelationId = ChunkIndex;
- if (FoundChunk)
- {
- ResponseChunk.ChunkSize = FoundChunk.Size();
- }
- else
- {
- ResponseChunk.ChunkSize = uint64_t(-1);
- }
- memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
- ResponsePtr += sizeof(ResponseChunk);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
- },
- HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/files",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- // File manifest fetch, returns the client file list
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
-
- const bool FilterClient = Params.GetValue("filter"sv) == "client"sv;
-
- CbObject ResponsePayload;
- std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload);
- if (Result.first == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
- }
- else
- {
- ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
- ToString(HttpReq.RequestVerb()),
- HttpReq.QueryString(),
- static_cast<int>(Result.first),
- Result.second);
- }
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/{chunk}/info",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- const auto& ChunkId = Req.GetCapture(3);
-
- CbObject ResponsePayload;
- std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload);
- if (Result.first == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
- }
- else if (Result.first == HttpResponseCode::NotFound)
- {
- ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
- }
- else
- {
- ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
- ToString(HttpReq.RequestVerb()),
- HttpReq.QueryString(),
- static_cast<int>(Result.first),
- Result.second);
- }
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/{chunk}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- const auto& ChunkId = Req.GetCapture(3);
-
- uint64_t Offset = 0;
- uint64_t Size = ~(0ull);
-
- auto QueryParms = Req.ServerRequest().GetQueryParams();
-
- if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false)
- {
- if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm))
- {
- Offset = OffsetVal.value();
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
- }
-
- if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false)
- {
- if (auto SizeVal = ParseInt<uint64_t>(SizeParm))
- {
- Size = SizeVal.value();
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
- }
-
- HttpContentType AcceptType = HttpReq.AcceptContentType();
-
- IoBuffer Chunk;
- std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->GetChunkRange(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk);
- if (Result.first == HttpResponseCode::OK)
- {
- ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType()));
- return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk);
- }
- else if (Result.first == HttpResponseCode::NotFound)
- {
- ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
- }
- else
- {
- ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
- ToString(HttpReq.RequestVerb()),
- HttpReq.QueryString(),
- static_cast<int>(Result.first),
- Result.second);
- }
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- },
- HttpVerb::kGet | HttpVerb::kHead);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/{hash}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- const auto& Cid = Req.GetCapture(3);
- HttpContentType AcceptType = HttpReq.AcceptContentType();
- HttpContentType RequestType = HttpReq.RequestContentType();
-
- switch (Req.ServerRequest().RequestVerb())
- {
- case HttpVerb::kGet:
- {
- IoBuffer Value;
- std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value);
-
- if (Result.first == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value);
- }
- else if (Result.first == HttpResponseCode::NotFound)
- {
- ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid);
- }
- else
- {
- ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
- ToString(HttpReq.RequestVerb()),
- HttpReq.QueryString(),
- static_cast<int>(Result.first),
- Result.second);
- }
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- case HttpVerb::kPost:
- {
- std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload());
- if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created)
- {
- return HttpReq.WriteResponse(Result.first);
- }
- else
- {
- ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
- ToString(HttpReq.RequestVerb()),
- HttpReq.QueryString(),
- static_cast<int>(Result.first),
- Result.second);
- }
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- }
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/prep",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- // This operation takes a list of referenced hashes and decides which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject RequestObject = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- for (auto Entry : RequestObject["have"sv])
- {
- const IoHash FileHash = Entry.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- ZEN_DEBUG("prep - NEED: {}", FileHash);
-
- NeedList.push_back(FileHash);
- }
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
- },
- HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/new",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
-
- bool IsUsingSalt = false;
- IoHash SaltHash = IoHash::Zero;
-
- if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false)
- {
- const uint32_t Salt = std::stoi(std::string(SaltParam));
- SaltHash = IoHash::HashBuffer(&Salt, sizeof Salt);
- IsUsingSalt = true;
- }
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Oplog = *FoundLog;
-
- IoBuffer Payload = HttpReq.ReadPayload();
-
- // This will attempt to open files which may not exist for the case where
- // the prep step rejected the chunk. This should be fixed since there's
- // a performance cost associated with any file system activity
-
- bool IsValid = true;
- std::vector<IoHash> MissingChunks;
-
- CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer {
- if (m_CidStore.ContainsChunk(Hash))
- {
- // Return null attachment as we already have it, no point in reading it and storing it again
- return {};
- }
-
- IoHash AttachmentId;
- if (IsUsingSalt)
- {
- IoHash AttachmentSpec[]{SaltHash, Hash};
- AttachmentId = IoHash::HashBuffer(MakeMemoryView(AttachmentSpec));
- }
- else
- {
- AttachmentId = Hash;
- }
-
- std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString();
- if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath))
- {
- return SharedBuffer(std::move(Data));
- }
- else
- {
- IsValid = false;
- MissingChunks.push_back(Hash);
-
- return {};
- }
- };
-
- CbPackage Package;
-
- if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver))
- {
- std::filesystem::path BadPackagePath =
- Oplog.TempPath() / "bad_packages"sv / fmt::format("session{}_request{}"sv, HttpReq.SessionId(), HttpReq.RequestId());
-
- ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath);
-
- WriteFile(BadPackagePath, Payload);
-
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
- }
-
- if (!IsValid)
- {
- // TODO: emit diagnostics identifying missing chunks
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing chunk reference");
- }
-
- CbObject Core = Package.GetObject();
-
- if (!Core["key"sv])
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified");
- }
-
- // Write core to oplog
-
- const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Package);
-
- if (OpLsn == ProjectStore::Oplog::kInvalidOp)
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString());
-
- HttpReq.WriteResponse(HttpResponseCode::Created);
- },
- HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/{op}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const std::string& ProjectId = Req.GetCapture(1);
- const std::string& OplogId = Req.GetCapture(2);
- const std::string& OpIdString = Req.GetCapture(3);
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Oplog = *FoundLog;
-
- if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString))
- {
- if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value()))
- {
- CbObject& Op = MaybeOp.value();
- if (Req.ServerRequest().AcceptContentType() == ZenContentType::kCbPackage)
- {
- CbPackage Package;
- Package.SetObject(Op);
-
- Op.IterateAttachments([&](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsAttachment();
- IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
-
- // We force this for now as content type is not consistently tracked (will
- // be fixed in CidStore refactor)
- Payload.SetContentType(ZenContentType::kCompressedBinary);
-
- if (Payload)
- {
- switch (Payload.GetContentType())
- {
- case ZenContentType::kCbObject:
- if (CbObject Object = LoadCompactBinaryObject(Payload))
- {
- Package.AddAttachment(CbAttachment(Object));
- }
- else
- {
- // Error - malformed object
-
- ZEN_WARN("malformed object returned for {}", AttachmentHash);
- }
- break;
-
- case ZenContentType::kCompressedBinary:
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)))
- {
- Package.AddAttachment(CbAttachment(Compressed, AttachmentHash));
- }
- else
- {
- // Error - not compressed!
-
- ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash);
- }
- break;
-
- default:
- Package.AddAttachment(CbAttachment(SharedBuffer(Payload)));
- break;
- }
- }
- });
-
- return HttpReq.WriteResponse(HttpResponseCode::Accepted, Package);
- }
- else
- {
- // Client cannot accept a package, so we only send the core object
- return HttpReq.WriteResponse(HttpResponseCode::Accepted, Op);
- }
- }
- }
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}",
- [this](HttpRouterRequest& Req) {
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
-
- if (!Project)
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
- }
-
- switch (Req.ServerRequest().RequestVerb())
- {
- case HttpVerb::kGet:
- {
- ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
-
- if (!OplogIt)
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("oplog {} not found in project {}", OplogId, ProjectId));
- }
-
- ProjectStore::Oplog& Log = *OplogIt;
-
- CbObjectWriter Cb;
- Cb << "id"sv << Log.OplogId() << "project"sv << Project->Identifier << "tempdir"sv << Log.TempPath().c_str()
- << "markerpath"sv << Log.MarkerPath().c_str() << "totalsize"sv << Log.TotalSize() << "opcount"
- << Log.OplogCount() << "expired"sv << Log.IsExpired();
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cb.Save());
- }
- break;
-
- case HttpVerb::kPost:
- {
- std::filesystem::path OplogMarkerPath;
- if (CbObject Params = Req.ServerRequest().ReadPayloadObject())
- {
- OplogMarkerPath = Params["gcpath"sv].AsString();
- }
-
- ProjectStore::Oplog* OplogIt = Project->OpenOplog(OplogId);
-
- if (!OplogIt)
- {
- if (!Project->NewOplog(OplogId, OplogMarkerPath))
- {
- // TODO: indicate why the operation failed!
- return Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError);
- }
-
- ZEN_INFO("established oplog '{}/{}', gc marker file at '{}'", ProjectId, OplogId, OplogMarkerPath);
-
- return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
- }
-
- // I guess this should ultimately be used to execute RPCs but for now, it
- // does absolutely nothing
-
- return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
- }
- break;
-
- case HttpVerb::kDelete:
- {
- ZEN_INFO("deleting oplog '{}/{}'", ProjectId, OplogId);
-
- Project->DeleteOplog(OplogId);
-
- return Req.ServerRequest().WriteResponse(HttpResponseCode::OK);
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete);
-
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/entries",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- CbObjectWriter Response;
-
- if (FoundLog->OplogCount() > 0)
- {
- HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
-
- if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty())
- {
- Oid OpKeyId = OpKeyStringAsOId(OpKey);
- std::optional<CbObject> Op = FoundLog->GetOpByKey(OpKeyId);
-
- if (Op.has_value())
- {
- Response << "entry"sv << Op.value();
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- }
- else
- {
- Response.BeginArray("entries"sv);
-
- FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; });
-
- Response.EndArray();
- }
- }
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "{project}",
- [this](HttpRouterRequest& Req) {
- const std::string ProjectId = Req.GetCapture(1);
-
- switch (Req.ServerRequest().RequestVerb())
- {
- case HttpVerb::kPost:
- {
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
- CbObject Params = LoadCompactBinaryObject(Payload);
- std::string_view Id = Params["id"sv].AsString();
- std::string_view Root = Params["root"sv].AsString();
- std::string_view EngineRoot = Params["engine"sv].AsString();
- std::string_view ProjectRoot = Params["project"sv].AsString();
- std::string_view ProjectFilePath = Params["projectfile"sv].AsString();
-
- const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId;
- m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot, ProjectFilePath);
-
- ZEN_INFO("established project - {} (id: '{}', roots: '{}', '{}', '{}', '{}'{})",
- ProjectId,
- Id,
- Root,
- EngineRoot,
- ProjectRoot,
- ProjectFilePath,
- ProjectFilePath.empty() ? ", project will not be GCd due to empty project file path" : "");
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
- }
- break;
-
- case HttpVerb::kGet:
- {
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
-
- if (!Project)
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
- }
-
- std::vector<std::string> OpLogs = Project->ScanForOplogs();
-
- CbObjectWriter Response;
- Response << "id"sv << Project->Identifier;
- Response << "root"sv << PathToUtf8(Project->RootDir);
- Response << "engine"sv << PathToUtf8(Project->EngineRootDir);
- Response << "project"sv << PathToUtf8(Project->ProjectRootDir);
- Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath);
-
- Response.BeginArray("oplogs"sv);
- for (const std::string& OplogId : OpLogs)
- {
- Response.BeginObject();
- Response << "id"sv << OplogId;
- Response.EndObject();
- }
- Response.EndArray(); // oplogs
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save());
- }
- break;
-
- case HttpVerb::kDelete:
- {
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
-
- if (!Project)
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound,
- HttpContentType::kText,
- fmt::format("project {} not found", ProjectId));
- }
-
- ZEN_INFO("deleting project '{}'", ProjectId);
- if (!m_ProjectStore->DeleteProject(ProjectId))
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::Locked,
- HttpContentType::kText,
- fmt::format("project {} is in use", ProjectId));
- }
-
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NoContent);
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete);
-
- // Push a oplog container
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/save",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- if (HttpReq.RequestContentType() != HttpContentType::kCbObject)
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type");
- }
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
-
- CbObject Response;
- std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response);
- if (Result.first == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
- }
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- },
- HttpVerb::kPost);
-
- // Pull a oplog container
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/load",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- if (HttpReq.AcceptContentType() != HttpContentType::kCbObject)
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type");
- }
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
-
- CbObject Response;
- std::pair<HttpResponseCode, std::string> Result =
- m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response);
- if (Result.first == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
- }
- if (Result.second.empty())
- {
- return HttpReq.WriteResponse(Result.first);
- }
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
- },
- HttpVerb::kGet);
-
- // Do an rpc style operation on project/oplog
- m_Router.RegisterRoute(
- "{project}/oplog/{log}/rpc",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- IoBuffer Payload = Req.ServerRequest().ReadPayload();
-
- m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
- },
- HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "details\\$",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
- bool CSV = Params.GetValue("csv") == "true";
- bool Details = Params.GetValue("details") == "true";
- bool OpDetails = Params.GetValue("opdetails") == "true";
- bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
-
- if (CSV)
- {
- ExtendableStringBuilder<4096> CSVWriter;
- CSVHeader(Details, AttachmentDetails, CSVWriter);
-
- m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
- Project.IterateOplogs([&](ProjectStore::Oplog& Oplog) {
- Oplog.IterateOplogWithKey(
- [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) {
- CSVWriteOp(m_CidStore,
- Project.Identifier,
- Oplog.OplogId(),
- Details,
- AttachmentDetails,
- LSN,
- Key,
- Op,
- CSVWriter);
- });
- });
- });
-
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
- }
- else
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("projects");
- {
- m_ProjectStore->DiscoverProjects();
-
- m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
- std::vector<std::string> OpLogs = Project.ScanForOplogs();
- CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
- });
- }
- Cbo.EndArray();
- HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "details\\$/{project}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
-
- HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
- bool CSV = Params.GetValue("csv") == "true";
- bool Details = Params.GetValue("details") == "true";
- bool OpDetails = Params.GetValue("opdetails") == "true";
- bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
-
- Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
- if (!FoundProject)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- ProjectStore::Project& Project = *FoundProject.Get();
- if (CSV)
- {
- ExtendableStringBuilder<4096> CSVWriter;
- CSVHeader(Details, AttachmentDetails, CSVWriter);
-
- FoundProject->IterateOplogs([&](ProjectStore::Oplog& Oplog) {
- Oplog.IterateOplogWithKey([this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN,
- const Oid& Key,
- CbObject Op) {
- CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
- });
- });
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
- }
- else
- {
- CbObjectWriter Cbo;
- std::vector<std::string> OpLogs = FoundProject->ScanForOplogs();
- Cbo.BeginArray("projects");
- {
- CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
- }
- Cbo.EndArray();
- HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "details\\$/{project}/{log}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
- bool CSV = Params.GetValue("csv") == "true";
- bool Details = Params.GetValue("details") == "true";
- bool OpDetails = Params.GetValue("opdetails") == "true";
- bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
-
- Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
- if (!FoundProject)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Project& Project = *FoundProject.Get();
- ProjectStore::Oplog& Oplog = *FoundLog;
- if (CSV)
- {
- ExtendableStringBuilder<4096> CSVWriter;
- CSVHeader(Details, AttachmentDetails, CSVWriter);
-
- Oplog.IterateOplogWithKey(
- [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](int LSN, const Oid& Key, CbObject Op) {
- CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
- });
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
- }
- else
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("oplogs");
- {
- CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
- }
- Cbo.EndArray();
- HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "details\\$/{project}/{log}/{chunk}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- const auto& ChunkId = Req.GetCapture(3);
-
- HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
- bool CSV = Params.GetValue("csv") == "true";
- bool Details = Params.GetValue("details") == "true";
- bool OpDetails = Params.GetValue("opdetails") == "true";
- bool AttachmentDetails = Params.GetValue("attachmentdetails") == "true";
-
- Ref<ProjectStore::Project> FoundProject = m_ProjectStore->OpenProject(ProjectId);
- if (!FoundProject)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- ProjectStore::Oplog* FoundLog = FoundProject->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
- {
- return HttpReq.WriteResponse(
- HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId));
- }
-
- const Oid ObjId = Oid::FromHexString(ChunkId);
- ProjectStore::Project& Project = *FoundProject.Get();
- ProjectStore::Oplog& Oplog = *FoundLog;
-
- int LSN = Oplog.GetOpIndexByKey(ObjId);
- if (LSN == -1)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- std::optional<CbObject> Op = Oplog.GetOpByIndex(LSN);
- if (!Op.has_value())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- if (CSV)
- {
- ExtendableStringBuilder<4096> CSVWriter;
- CSVHeader(Details, AttachmentDetails, CSVWriter);
-
- CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, ObjId, Op.value(), CSVWriter);
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
- }
- else
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("ops");
- {
- CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN, ObjId, Op.value(), Cbo);
- }
- Cbo.EndArray();
- HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
- },
- HttpVerb::kGet);
-}
-
-HttpProjectService::~HttpProjectService()
-{
- m_StatsService.UnregisterHandler("prj", *this);
-}
-
-const char*
-HttpProjectService::BaseUri() const
-{
- return "/prj/";
-}
-
-void
-HttpProjectService::HandleRequest(HttpServerRequest& Request)
-{
- if (m_Router.HandleRequest(Request) == false)
- {
- ZEN_WARN("No route found for {0}", Request.RelativeUri());
- }
-}
-
-void
-HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq)
-{
- const GcStorageSize StoreSize = m_ProjectStore->StorageSize();
- const CidStoreSize CidSize = m_CidStore.TotalSize();
-
- CbObjectWriter Cbo;
- Cbo.BeginObject("store");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "disk" << StoreSize.DiskSize;
- Cbo << "memory" << StoreSize.MemorySize;
- }
- Cbo.EndObject();
- }
- Cbo.EndObject();
-
- Cbo.BeginObject("cid");
- {
- Cbo.BeginObject("size");
- {
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
- }
- Cbo.EndObject();
- }
- Cbo.EndObject();
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-#if ZEN_WITH_TESTS
-
-namespace testutils {
- using namespace std::literals;
-
- std::string OidAsString(const Oid& Id)
- {
- StringBuilder<25> OidStringBuilder;
- Id.ToString(OidStringBuilder);
- return OidStringBuilder.ToString();
- }
-
- CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
- {
- CbPackage Package;
- CbObjectWriter Object;
- Object << "key"sv << OidAsString(Id);
- if (!Attachments.empty())
- {
- Object.BeginArray("bulkdata");
- for (const auto& Attachment : Attachments)
- {
- CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
- Object.BeginObject();
- Object << "id"sv << Attachment.first;
- Object << "type"sv
- << "Standard"sv;
- Object << "data"sv << Attach;
- Object.EndObject();
-
- Package.AddAttachment(Attach);
- }
- Object.EndArray();
- }
- Package.SetObject(Object.Save());
- return Package;
- };
-
- std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes)
- {
- std::vector<std::pair<Oid, CompressedBuffer>> Result;
- Result.reserve(Sizes.size());
- for (size_t Size : Sizes)
- {
- std::vector<uint8_t> Data;
- Data.resize(Size);
- uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
- for (size_t Idx = 0; Idx < Size / 2; ++Idx)
- {
- DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu);
- }
- if (Size & 1)
- {
- Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff);
- }
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
- Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
- }
- return Result;
- }
-
- uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
- {
- if (RawOffset > 0)
- {
- uint64 BlockSize = 0;
- OodleCompressor Compressor;
- OodleCompressionLevel CompressionLevel;
- if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
- {
- return 0;
- }
- return BlockSize > 0 ? RawOffset % BlockSize : 0;
- }
- return 0;
- }
-
-} // namespace testutils
-
-TEST_CASE("project.store.create")
-{
- using namespace std::literals;
-
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- std::string_view ProjectName("proj1"sv);
- std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
- std::filesystem::path RootDir = TempDir.Path() / "root";
- std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
- std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
- std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
-
- Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / ProjectName,
- ProjectName,
- RootDir.string(),
- EngineRootDir.string(),
- ProjectRootDir.string(),
- ProjectFilePath.string()));
- CHECK(ProjectStore.DeleteProject(ProjectName));
- CHECK(!Project->Exists(BasePath));
-}
-
-TEST_CASE("project.store.lifetimes")
-{
- using namespace std::literals;
-
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
- std::filesystem::path RootDir = TempDir.Path() / "root";
- std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
- std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
- std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
-
- Ref<ProjectStore::Project> Project(ProjectStore.NewProject(BasePath / "proj1"sv,
- "proj1"sv,
- RootDir.string(),
- EngineRootDir.string(),
- ProjectRootDir.string(),
- ProjectFilePath.string()));
- ProjectStore::Oplog* Oplog = Project->NewOplog("oplog1", {});
- CHECK(Oplog != nullptr);
-
- std::filesystem::path DeletePath;
- CHECK(Project->PrepareForDelete(DeletePath));
- CHECK(!DeletePath.empty());
- CHECK(Project->OpenOplog("oplog1") == nullptr);
- // Oplog is now invalid, but pointer can still be accessed since we store old oplog pointers
- CHECK(Oplog->OplogCount() == 0);
- // Project is still valid since we have a Ref to it
- CHECK(Project->Identifier == "proj1"sv);
-}
-
-TEST_CASE("project.store.gc")
-{
- using namespace std::literals;
- using namespace testutils;
-
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
- std::filesystem::path RootDir = TempDir.Path() / "root";
- std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
-
- std::filesystem::path Project1RootDir = TempDir.Path() / "game1";
- std::filesystem::path Project1FilePath = TempDir.Path() / "game1" / "game.uproject";
- {
- CreateDirectories(Project1FilePath.parent_path());
- BasicFile ProjectFile;
- ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
- }
-
- std::filesystem::path Project2RootDir = TempDir.Path() / "game2";
- std::filesystem::path Project2FilePath = TempDir.Path() / "game2" / "game.uproject";
- {
- CreateDirectories(Project2FilePath.parent_path());
- BasicFile ProjectFile;
- ProjectFile.Open(Project2FilePath, BasicFile::Mode::kTruncate);
- }
-
- {
- Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
- "proj1"sv,
- RootDir.string(),
- EngineRootDir.string(),
- Project1RootDir.string(),
- Project1FilePath.string()));
- ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1", {});
- CHECK(Oplog != nullptr);
-
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
- }
-
- {
- Ref<ProjectStore::Project> Project2(ProjectStore.NewProject(BasePath / "proj2"sv,
- "proj2"sv,
- RootDir.string(),
- EngineRootDir.string(),
- Project2RootDir.string(),
- Project2FilePath.string()));
- ProjectStore::Oplog* Oplog = Project2->NewOplog("oplog1", {});
- CHECK(Oplog != nullptr);
-
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), {}));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{177})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{9123, 383, 590, 96})));
- Oplog->AppendNewOplogEntry(CreateOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{535, 221})));
- }
-
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- ProjectStore.GatherReferences(GcCtx);
- size_t RefCount = 0;
- GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; });
- CHECK(RefCount == 14);
- ProjectStore.CollectGarbage(GcCtx);
- CHECK(ProjectStore.OpenProject("proj1"sv));
- CHECK(ProjectStore.OpenProject("proj2"sv));
- }
-
- std::filesystem::remove(Project1FilePath);
-
- {
- GcContext GcCtx(GcClock::Now() - std::chrono::hours(24));
- ProjectStore.GatherReferences(GcCtx);
- size_t RefCount = 0;
- GcCtx.IterateCids([&RefCount](const IoHash&) { RefCount++; });
- CHECK(RefCount == 7);
- ProjectStore.CollectGarbage(GcCtx);
- CHECK(!ProjectStore.OpenProject("proj1"sv));
- CHECK(ProjectStore.OpenProject("proj2"sv));
- }
-}
-
-TEST_CASE("project.store.partial.read")
-{
- using namespace std::literals;
- using namespace testutils;
-
- ScopedTemporaryDirectory TempDir;
-
- GcManager Gc;
- CidStore CidStore(Gc);
- CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
-
- std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc);
- std::filesystem::path RootDir = TempDir.Path() / "root"sv;
- std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
-
- std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv;
- std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv;
- {
- CreateDirectories(Project1FilePath.parent_path());
- BasicFile ProjectFile;
- ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
- }
-
- std::vector<Oid> OpIds;
- OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()});
- std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
- {
- Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
- "proj1"sv,
- RootDir.string(),
- EngineRootDir.string(),
- Project1RootDir.string(),
- Project1FilePath.string()));
- ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv, {});
- CHECK(Oplog != nullptr);
- Attachments[OpIds[0]] = {};
- Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});
- Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99});
- Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
- for (auto It : Attachments)
- {
- Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second));
- }
- }
- {
- IoBuffer Chunk;
- CHECK(ProjectStore
- .GetChunk("proj1"sv,
- "oplog1"sv,
- Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(),
- HttpContentType::kCompressedBinary,
- Chunk)
- .first == HttpResponseCode::OK);
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
- CHECK(RawSize == Attachments[OpIds[1]][0].second.DecodeRawSize());
- }
-
- IoBuffer ChunkResult;
- CHECK(ProjectStore
- .GetChunkRange("proj1"sv,
- "oplog1"sv,
- OidAsString(Attachments[OpIds[2]][1].first),
- 0,
- ~0ull,
- HttpContentType::kCompressedBinary,
- ChunkResult)
- .first == HttpResponseCode::OK);
- CHECK(ChunkResult);
- CHECK(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult)).DecodeRawSize() ==
- Attachments[OpIds[2]][1].second.DecodeRawSize());
-
- IoBuffer PartialChunkResult;
- CHECK(ProjectStore
- .GetChunkRange("proj1"sv,
- "oplog1"sv,
- OidAsString(Attachments[OpIds[2]][1].first),
- 5,
- 1773,
- HttpContentType::kCompressedBinary,
- PartialChunkResult)
- .first == HttpResponseCode::OK);
- CHECK(PartialChunkResult);
- IoHash PartialRawHash;
- uint64_t PartialRawSize;
- CompressedBuffer PartialCompressedResult =
- CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult), PartialRawHash, PartialRawSize);
- CHECK(PartialRawSize >= 1773);
-
- uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
- SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
- SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress();
- const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]);
- const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
- CHECK(FullDataPtr[0] == PartialDataPtr[0]);
-}
-
-TEST_CASE("project.store.block")
-{
- using namespace std::literals;
- using namespace testutils;
-
- std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489,
- 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
- 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
-
- std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
- std::vector<SharedBuffer> Chunks;
- Chunks.reserve(AttachmentSizes.size());
- for (const auto& It : AttachmentsWithId)
- {
- Chunks.push_back(It.second.GetCompressed().Flatten());
- }
- CompressedBuffer Block = GenerateBlock(std::move(Chunks));
- IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
- CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
-}
-
-#endif
-
-void
-prj_forcelink()
-{
-}
-
-} // namespace zen