diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-14 09:50:00 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2025-03-14 09:50:00 +0100 |
| commit | 55c67aec301cfc99178ab54c6366cbc88f35d46a (patch) | |
| tree | 84b4c73220f7dd041763b6d1919eedc8d0b90844 /src/zenserver | |
| parent | Merge remote-tracking branch 'origin/de/zen-service-command' into de/zen-serv... (diff) | |
| parent | fix quoted command lines arguments (#306) (diff) | |
| download | zen-55c67aec301cfc99178ab54c6366cbc88f35d46a.tar.xz zen-55c67aec301cfc99178ab54c6366cbc88f35d46a.zip | |
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/objectstore/objectstore.cpp | 29 | ||||
| -rw-r--r-- | src/zenserver/objectstore/objectstore.h | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 107 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 32 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 67 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 353 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 29 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/workspaces/httpworkspaces.cpp | 18 |
11 files changed, 247 insertions, 402 deletions
diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp index b0212ab07..e757ef84e 100644 --- a/src/zenserver/objectstore/objectstore.cpp +++ b/src/zenserver/objectstore/objectstore.cpp @@ -269,9 +269,9 @@ HttpObjectStoreService::Inititalize() m_Router.RegisterRoute( "bucket/{path}", [this](zen::HttpRouterRequest& Request) { - const std::string Path = Request.GetCapture(1); - const auto Sep = Path.find_last_of('.'); - const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0; + const std::string_view Path = Request.GetCapture(1); + const auto Sep = Path.find_last_of('.'); + const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0; if (IsObject) { @@ -337,18 +337,18 @@ HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request) } void -HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string& Path) +HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path) { namespace fs = std::filesystem; - const auto Sep = Path.find_first_of('/'); - const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep); + const auto Sep = Path.find_first_of('/'); + const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)}; if (BucketName.empty()) { return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); } - std::string BucketPrefix = Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1); + std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)}; if (BucketPrefix.empty()) { const auto QueryParms = Request.ServerRequest().GetQueryParams(); @@ -376,7 +376,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s Writer.BeginArray("Contents"sv); } - void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override + void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override { const fs::path FullPath = Parent / fs::path(File); fs::path RelativePath = fs::relative(FullPath, BucketPath); @@ -450,14 +450,13 @@ HttpObjectStoreService::DeleteBucket(zen::HttpRouterRequest& Request) } void -HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string& Path) +HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string_view Path) { namespace fs = std::filesystem; - const auto Sep = Path.find_first_of('/'); - const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep); - const std::string BucketPrefix = - Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1); + const auto Sep = Path.find_first_of('/'); + const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)}; + const std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)}; const fs::path BucketDir = GetBucketDirectory(BucketName); @@ -554,8 +553,8 @@ HttpObjectStoreService::PutObject(zen::HttpRouterRequest& Request) { namespace fs = std::filesystem; - const std::string& BucketName = Request.GetCapture(1); - const fs::path BucketDir = GetBucketDirectory(BucketName); + const std::string_view BucketName = Request.GetCapture(1); + const fs::path BucketDir = GetBucketDirectory(BucketName); if (BucketDir.empty()) { diff --git a/src/zenserver/objectstore/objectstore.h b/src/zenserver/objectstore/objectstore.h index c905ceab3..dae979c4c 100644 --- a/src/zenserver/objectstore/objectstore.h +++ b/src/zenserver/objectstore/objectstore.h @@ -36,9 +36,9 @@ private: void Inititalize(); std::filesystem::path GetBucketDirectory(std::string_view BucketName); void CreateBucket(zen::HttpRouterRequest& Request); - void ListBucket(zen::HttpRouterRequest& Request, const std::string& Path); + void ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path); void DeleteBucket(zen::HttpRouterRequest& Request); - void GetObject(zen::HttpRouterRequest& Request, const std::string& Path); + void GetObject(zen::HttpRouterRequest& Request, const std::string_view Path); void PutObject(zen::HttpRouterRequest& Request); ObjectStoreConfig m_Cfg; diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index 302b81729..fbb9bc344 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -3,6 +3,7 @@ #include "buildsremoteprojectstore.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> @@ -114,24 +115,25 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, + const IoHash& RawHash, + ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult PutResult = - Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload); + Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); AddStats(PutResult); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, Result.Reason); return Result; @@ -139,57 +141,21 @@ public: if (Block.BlockHash == RawHash) { - ZEN_ASSERT(Block.ChunkLengths.size() == Block.ChunkHashes.size()); - CbObjectWriter Writer; - Writer.AddHash("rawHash"sv, RawHash); - Writer.BeginArray("rawHashes"sv); - { - for (const IoHash& ChunkHash : Block.ChunkHashes) - { - Writer.AddHash(ChunkHash); - } - } - Writer.EndArray(); - Writer.BeginArray("chunkLengths"); - { - for (uint32_t ChunkSize : Block.ChunkLengths) - { - Writer.AddInteger(ChunkSize); - } - } - Writer.EndArray(); - Writer.BeginArray("chunkOffsets"); - { - ZEN_ASSERT(Block.FirstChunkOffset != (uint32_t)-1); - uint32_t Offset = Block.FirstChunkOffset; - for (uint32_t ChunkSize : Block.ChunkLengths) - { - Writer.AddInteger(Offset); - Offset += ChunkSize; - } - } - Writer.EndArray(); + CbObjectWriter BlockMetaData; + BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); - Writer.BeginObject("metadata"sv); - { - Writer.AddString("createdBy", "zenserver"); - } - Writer.EndObject(); - - IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer(); + IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer(); MetaPayload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutMetaResult = - Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload); + JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload); AddStats(PutMetaResult); RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); if (MetaDataResult.ErrorCode) { - ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'", + ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, MetaDataResult.Reason); } @@ -342,51 +308,47 @@ public: { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId); AddStats(FindResult); GetKnownBlocksResult Result{ConvertResult(FindResult)}; if (Result.ErrorCode) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, Result.Reason); return Result; } - CbObject BlocksObject = LoadCompactBinaryObject(FindResult.Response); - if (!BlocksObject) + if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv, + Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, - m_BuildId, - m_OplogBuildPartId); + m_BuildId); return Result; } - - CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); - Result.Blocks.reserve(Blocks.Num()); - for (CbFieldView BlockView : Blocks) + std::optional<std::vector<ChunkBlockDescription>> Blocks = + ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response)); + if (!Blocks) { - CbObjectView BlockObject = BlockView.AsObjectView(); - IoHash BlockHash = BlockObject["rawHash"sv].AsHash(); - if (BlockHash != IoHash::Zero) - { - CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkView : ChunksArray) - { - ChunkHashes.push_back(ChunkView.AsHash()); - } - Result.Blocks.emplace_back(Block{.BlockHash = BlockHash, .ChunkHashes = ChunkHashes}); - } + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv, + m_JupiterClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId); + return Result; + } + Result.Blocks.reserve(Blocks.value().size()); + for (ChunkBlockDescription& BlockDescription : Blocks.value()) + { + Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, + .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); } return Result; } @@ -395,18 +357,17 @@ public: { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath); + JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath); AddStats(GetResult); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, Result.Reason); } diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 0fe739a12..98e292d91 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -106,7 +106,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { Stopwatch Timer; SaveAttachmentResult Result; @@ -192,7 +192,7 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; } - std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; Result.Blocks = std::move(KnownBlocks); return Result; diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 0b8e5f13b..47748dd90 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -983,15 +983,19 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) IoBuffer Payload = HttpReq.ReadPayload(); CbObject RequestObject = LoadCompactBinaryObject(Payload); - std::vector<IoHash> ChunkList; - CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); - ChunkList.reserve(HaveList.Num()); - for (auto& Entry : HaveList) + std::vector<IoHash> NeedList; + { - ChunkList.push_back(Entry.AsHash()); - } + eastl::fixed_vector<IoHash, 16> ChunkList; + CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); + ChunkList.reserve(HaveList.Num()); + for (auto& Entry : HaveList) + { + ChunkList.push_back(Entry.AsHash()); + } - std::vector<IoHash> NeedList = FoundLog->CheckPendingChunkReferences(ChunkList, std::chrono::minutes(2)); + NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2)); + } CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); @@ -1151,7 +1155,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } - std::vector<IoHash> ReferencedChunks; + eastl::fixed_vector<IoHash, 16> ReferencedChunks; Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); }); // Write core to oplog @@ -1169,7 +1173,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) // Once we stored the op, we no longer need to retain any chunks this op references if (!ReferencedChunks.empty()) { - FoundLog->RemovePendingChunkReferences(ReferencedChunks); + FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks))); } m_ProjectStats.OpWriteCount++; @@ -1301,9 +1305,9 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) HttpServerRequest& HttpReq = Req.ServerRequest(); - const std::string& ProjectId = Req.GetCapture(1); - const std::string& OplogId = Req.GetCapture(2); - const std::string& OpIdString = Req.GetCapture(3); + const std::string_view ProjectId = Req.GetCapture(1); + const std::string_view OplogId = Req.GetCapture(2); + const std::string_view OpIdString = Req.GetCapture(3); Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) @@ -1690,8 +1694,8 @@ HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req) using namespace std::literals; - HttpServerRequest& HttpReq = Req.ServerRequest(); - const std::string ProjectId = Req.GetCapture(1); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const std::string_view ProjectId = Req.GetCapture(1); switch (HttpReq.RequestVerb()) { diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index e906127ff..e5839ad3b 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -92,7 +92,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); @@ -193,7 +193,7 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}}; } - std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); GetKnownBlocksResult Result{ {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}}; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 46a236af9..86791e29a 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -423,9 +423,13 @@ ComputeOpKey(const CbObjectView& Op) { using namespace std::literals; - BinaryWriter KeyStream; + eastl::fixed_vector<uint8_t, 256> KeyData; - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyStream.Write(Data, Size); }); + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { + auto Begin = reinterpret_cast<const uint8_t*>(Data); + auto End = Begin + Size; + KeyData.insert(KeyData.end(), Begin, End); + }); XXH3_128 KeyHash128; @@ -434,15 +438,15 @@ ComputeOpKey(const CbObjectView& Op) // path but longer paths are evaluated properly. In the future all key lengths // should be evaluated using the proper path, this is a temporary workaround to // maintain compatibility with existing disk state. - if (KeyStream.GetSize() < 240) + if (KeyData.size() < 240) { XXH3_128Stream_deprecated KeyHasher; - KeyHasher.Append(KeyStream.Data(), KeyStream.Size()); + KeyHasher.Append(KeyData.data(), KeyData.size()); KeyHash128 = KeyHasher.GetHash(); } else { - KeyHash128 = XXH3_128::HashMemory(KeyStream.GetView()); + KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size()); } Oid KeyHash; @@ -2735,7 +2739,7 @@ ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHa MissingChunks.reserve(ChunkHashes.size()); for (const IoHash& FileHash : ChunkHashes) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(FileHash); !Payload) + if (!m_CidStore.ContainsChunk(FileHash)) { MissingChunks.push_back(FileHash); } @@ -3359,7 +3363,6 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OpenOplog"); - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); { RwLock::SharedLockScope ProjectLock(m_ProjectLock); @@ -3367,21 +3370,35 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo if (OplogIt != m_Oplogs.end()) { - if (!VerifyPathOnDisk || Oplog::ExistsAt(OplogBasePath)) + bool ReOpen = false; + + if (VerifyPathOnDisk) { - return OplogIt->second.get(); + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + if (!Oplog::ExistsAt(OplogBasePath)) + { + // Somebody deleted the oplog on disk behind our back + ProjectLock.ReleaseNow(); + std::filesystem::path DeletePath; + if (!RemoveOplog(OplogId, DeletePath)) + { + ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); + } + + ReOpen = true; + } } - // Somebody deleted the oplog on disk behind our back - ProjectLock.ReleaseNow(); - std::filesystem::path DeletePath; - if (!RemoveOplog(OplogId, DeletePath)) + if (!ReOpen) { - ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); + return OplogIt->second.get(); } } } + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + RwLock::ExclusiveLockScope Lock(m_ProjectLock); if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) { @@ -5347,7 +5364,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, /* BuildBlocks */ false, /* IgnoreMissingAttachments */ false, /* AllowChunking*/ false, - [](CompressedBuffer&&, RemoteProjectStore::Block&&) {}, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, /* EmbedLooseFiles*/ false); @@ -8621,14 +8638,18 @@ TEST_CASE("project.store.block") Chunks.reserve(AttachmentSizes.size()); for (const auto& It : AttachmentsWithId) { - Chunks.push_back(std::make_pair(It.second.DecodeRawHash(), - [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer { - return CompositeBuffer(SharedBuffer(Buffer)); - })); - } - RemoteProjectStore::Block Block; - CompressedBuffer BlockBuffer = GenerateBlock(std::move(Chunks), Block); - CHECK(IterateBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); + Chunks.push_back( + std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Buffer.DecodeRawSize(), Buffer}; + })); + } + ChunkBlockDescription Block; + CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block); + uint64_t HeaderSize; + CHECK(IterateChunkBlock( + BlockBuffer.Decompress(), + [](CompressedBuffer&&, const IoHash&) {}, + HeaderSize)); } TEST_CASE("project.store.iterateoplog") diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 0589fdc5f..a7263da83 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -12,8 +12,8 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> -#include <zenstore/chunkedfile.h> #include <zenstore/cidstore.h> +#include <zenutil/chunkedfile.h> #include <zenutil/workerpools.h> #include <unordered_map> @@ -143,7 +143,7 @@ namespace remotestore_impl { NiceBytes(Stats.m_PeakReceivedBytes)); } - size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks) + size_t AddBlock(RwLock& BlocksLock, std::vector<ChunkBlockDescription>& Blocks) { size_t BlockIndex; { @@ -154,63 +154,6 @@ namespace remotestore_impl { return BlockIndex; } - IoBuffer WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path) - { - if (std::filesystem::is_regular_file(Path)) - { - IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path)); - if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize()) - { - ExistingTempFile.SetDeleteOnClose(true); - return ExistingTempFile; - } - } - IoBuffer BlockBuffer; - BasicFile BlockFile; - uint32_t RetriesLeft = 3; - BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { - if (RetriesLeft == 0) - { - return false; - } - ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - RetriesLeft--; - return true; - }); - uint64_t Offset = 0; - { - CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed(); - for (const SharedBuffer& Segment : Compressed.GetSegments()) - { - size_t SegmentSize = Segment.GetSize(); - static const uint64_t BufferingSize = 256u * 1024u; - - IoBufferFileReference FileRef; - if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.GetFileReference(FileRef)) - { - ScanFile(FileRef.FileHandle, - FileRef.FileChunkOffset, - FileRef.FileChunkSize, - BufferingSize, - [&BlockFile, &Offset](const void* Data, size_t Size) { - BlockFile.Write(Data, Size, Offset); - Offset += Size; - }); - } - else - { - BlockFile.Write(Segment.GetData(), SegmentSize, Offset); - Offset += SegmentSize; - } - } - } - void* FileHandle = BlockFile.Detach(); - BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); - BlockBuffer.SetDeleteOnClose(true); - return BlockBuffer; - } - RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) { using namespace std::literals; @@ -573,21 +516,23 @@ namespace remotestore_impl { return; } - bool StoreChunksOK = IterateBlock( - BlockPayload, - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - } - }); + uint64_t BlockHeaderSize = 0; + bool StoreChunksOK = IterateChunkBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }, + BlockHeaderSize); if (!StoreChunksOK) { @@ -738,14 +683,14 @@ namespace remotestore_impl { }); }; - void CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<RemoteProjectStore::Block>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) + void CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<ChunkBlockDescription>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork([&Blocks, @@ -764,10 +709,10 @@ namespace remotestore_impl { try { ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - RemoteProjectStore::Block Block; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); + Stopwatch Timer; + ChunkBlockDescription Block; + CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope __(SectionsLock); @@ -800,8 +745,8 @@ namespace remotestore_impl { struct CreatedBlock { - IoBuffer Payload; - RemoteProjectStore::Block Block; + IoBuffer Payload; + ChunkBlockDescription Block; }; void UploadAttachments(WorkerThreadPool& WorkerPool, @@ -931,8 +876,8 @@ namespace remotestore_impl { } try { - IoBuffer Payload; - RemoteProjectStore::Block Block; + IoBuffer Payload; + ChunkBlockDescription Block; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { Payload = BlockIt->second.Payload; @@ -1058,7 +1003,7 @@ namespace remotestore_impl { { auto It = BulkBlockAttachmentsToUpload.find(Chunk); ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompositeBuffer ChunkPayload = It->second(It->first); + CompressedBuffer ChunkPayload = It->second(It->first).second; if (!ChunkPayload) { RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), @@ -1067,8 +1012,8 @@ namespace remotestore_impl { ChunkBuffers.clear(); break; } - ChunksSize += ChunkPayload.GetSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); + ChunksSize += ChunkPayload.GetCompressedSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) @@ -1139,54 +1084,13 @@ namespace remotestore_impl { } } // namespace remotestore_impl -bool -IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) -{ - ZEN_ASSERT(BlockPayload); - if (BlockPayload.GetSize() < 1) - { - return false; - } - - MemoryView BlockView = BlockPayload.GetView(); - const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); - uint32_t NumberSize; - uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); - ReadPtr += NumberSize; - std::vector<uint64_t> ChunkSizes; - ChunkSizes.reserve(ChunkCount); - while (ChunkCount--) - { - ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); - ReadPtr += NumberSize; - } - ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr); - ZEN_ASSERT(TempBufferLength > 0); - for (uint64_t ChunkSize : ChunkSizes) - { - IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); - IoHash AttachmentRawHash; - uint64_t AttachmentRawSize; - CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); - - if (!CompressedChunk) - { - ZEN_ERROR("Invalid chunk in block"); - return false; - } - Visitor(std::move(CompressedChunk), AttachmentRawHash); - ReadPtr += ChunkSize; - ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); - } - return true; -}; std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject) { using namespace std::literals; - std::vector<RemoteProjectStore::Block> Result; - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + std::vector<ChunkBlockDescription> Result; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); std::vector<IoHash> BlockHashes; BlockHashes.reserve(BlocksArray.Num()); @@ -1199,11 +1103,11 @@ GetBlockHashesFromOplog(CbObjectView ContainerObject) return BlockHashes; } -std::vector<RemoteProjectStore::Block> +std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes) { using namespace std::literals; - std::vector<RemoteProjectStore::Block> Result; + std::vector<ThinChunkBlockDescription> Result; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet; IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end()); @@ -1226,53 +1130,12 @@ GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> Include { ChunkHashes.push_back(ChunkField.AsHash()); } - Result.push_back({.BlockHash = BlockHash, .ChunkHashes = std::move(ChunkHashes)}); + Result.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); } } return Result; } -CompressedBuffer -GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock) -{ - const size_t ChunkCount = FetchChunks.size(); - - std::vector<SharedBuffer> ChunkSegments; - ChunkSegments.resize(1); - ChunkSegments.reserve(1 + ChunkCount); - OutBlock.ChunkHashes.reserve(ChunkCount); - OutBlock.ChunkLengths.reserve(ChunkCount); - { - IoBuffer TempBuffer(ChunkCount * 9); - MutableMemoryView View = TempBuffer.GetMutableView(); - uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); - uint8_t* BufferEndPtr = BufferStartPtr; - BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); - for (const auto& It : FetchChunks) - { - CompositeBuffer Chunk = It.second(It.first); - uint64_t ChunkSize = 0; - std::span<const SharedBuffer> Segments = Chunk.GetSegments(); - for (const SharedBuffer& Segment : Segments) - { - ChunkSize += Segment.GetSize(); - ChunkSegments.push_back(Segment); - } - BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); - OutBlock.ChunkHashes.push_back(It.first); - OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize)); - } - ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); - ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); - ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); - } - CompressedBuffer CompressedBlock = - CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); - OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); - OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize()); - return CompressedBlock; -} - CbObject BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, @@ -1283,9 +1146,9 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::vector<RemoteProjectStore::Block>& KnownBlocks, + const std::vector<ThinChunkBlockDescription>& KnownBlocks, WorkerThreadPool& WorkerPool, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, @@ -1307,9 +1170,9 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - RwLock BlocksLock; - std::vector<RemoteProjectStore::Block> Blocks; - CompressedBuffer OpsBuffer; + RwLock BlocksLock; + std::vector<ChunkBlockDescription> Blocks; + CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -1525,7 +1388,7 @@ BuildContainer(CidStore& ChunkStore, return {}; } - auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks, + auto FindReuseBlocks = [](const std::vector<ThinChunkBlockDescription>& KnownBlocks, const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, JobContext* OptionalContext) -> std::vector<size_t> { std::vector<size_t> ReuseBlockIndexes; @@ -1538,14 +1401,14 @@ BuildContainer(CidStore& ChunkStore, for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size(); + const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); if (BlockAttachmentCount == 0) { continue; } size_t FoundAttachmentCount = 0; - for (const IoHash& KnownHash : KnownBlock.ChunkHashes) + for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { if (Attachments.contains(KnownHash)) { @@ -1586,8 +1449,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkHashes) + const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { if (UploadAttachments.erase(KnownHash) == 1) { @@ -1605,10 +1468,7 @@ BuildContainer(CidStore& ChunkStore, }; std::vector<ChunkedFile> ChunkedFiles; - auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash, - IoBuffer& RawData, - const IoBufferFileReference& FileRef, - JobContext*) -> ChunkedFile { + auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile { ChunkedFile Chunked; Stopwatch Timer; @@ -1632,12 +1492,12 @@ BuildContainer(CidStore& ChunkStore, return Chunked; }; - RwLock ResolveLock; - std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments; - std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; + RwLock ResolveLock; + std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; + std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; + std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); @@ -1717,9 +1577,7 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - - IoBuffer TempAttachmentBuffer = - remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath); + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), @@ -1730,7 +1588,7 @@ BuildContainer(CidStore& ChunkStore, } else { - size_t RawSize = RawData.GetSize(); + uint64_t RawSize = RawData.GetSize(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); @@ -1738,23 +1596,24 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - IoBuffer TempAttachmentBuffer = remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath); + uint64_t CompressedSize = Compressed.GetCompressedSize(); + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); - if (Compressed.GetCompressedSize() > MaxChunkEmbedSize) + if (CompressedSize > MaxChunkEmbedSize) { OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } else { - UploadAttachment->Size = Compressed.GetCompressedSize(); + UploadAttachment->Size = CompressedSize; ResolveLock.WithExclusiveLock( - [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data)); + [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); }); } } @@ -1927,8 +1786,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkHashes) + const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { if (ChunkedHashes.erase(KnownHash) == 1) { @@ -1946,7 +1805,7 @@ BuildContainer(CidStore& ChunkStore, Blocks.reserve(ReuseBlockCount); for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++) { - Blocks.push_back(KnownBlocks[*It]); + Blocks.push_back({KnownBlocks[*It]}); } remotestore_impl::ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); @@ -2062,9 +1921,9 @@ BuildContainer(CidStore& ChunkStore, { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunkHashes.insert(Blocks[BlockIndex].ChunkHashes.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); + Blocks[BlockIndex].ChunkRawHashes.insert(Blocks[BlockIndex].ChunkRawHashes.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); } uint64_t NowMS = Timer.GetElapsedTimeMs(); ZEN_INFO("Assembled block {} with {} chunks in {} ({})", @@ -2109,16 +1968,25 @@ BuildContainer(CidStore& ChunkStore, { if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) { - ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) { - return CompositeBuffer(IoBuffer); - })); + ChunksInBlock.emplace_back(std::make_pair( + RawHash, + [RawSize = It->second.first, + IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer())); + })); LooseUploadAttachments.erase(It); } else { - ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) { - return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); - })); + ChunksInBlock.emplace_back( + std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { + IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); + IoHash _; + uint64_t RawSize = 0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), _, RawSize); + ZEN_ASSERT(Compressed); + return {RawSize, Compressed}; + })); } BlockSize += PayloadSize; @@ -2169,14 +2037,15 @@ BuildContainer(CidStore& ChunkStore, if (BlockAttachmentHashes.insert(ChunkHash).second) { const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; - ChunksInBlock.emplace_back(std::make_pair( - ChunkHash, - [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) { - return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), - OodleCompressor::Mermaid, - OodleCompressionLevel::None) - .GetCompressed(); - })); + ChunksInBlock.emplace_back( + std::make_pair(ChunkHash, + [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( + const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Size, + CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None)}; + })); BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; if (BuildBlocks) { @@ -2298,9 +2167,9 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); OplogContinerWriter.BeginArray("blocks"sv); { - for (const RemoteProjectStore::Block& B : Blocks) + for (const ChunkBlockDescription& B : Blocks) { - ZEN_ASSERT(!B.ChunkHashes.empty()); + ZEN_ASSERT(!B.ChunkRawHashes.empty()); if (BuildBlocks) { ZEN_ASSERT(B.BlockHash != IoHash::Zero); @@ -2310,7 +2179,7 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunkHashes) + for (const IoHash& RawHash : B.ChunkRawHashes) { OplogContinerWriter.AddHash(RawHash); } @@ -2326,7 +2195,7 @@ BuildContainer(CidStore& ChunkStore, { OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunkHashes) + for (const IoHash& RawHash : B.ChunkRawHashes) { OplogContinerWriter.AddBinaryAttachment(RawHash); } @@ -2392,7 +2261,7 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) @@ -2458,13 +2327,13 @@ SaveOplog(CidStore& ChunkStore, std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - RemoteProjectStore::Block&& Block) { + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(Block.BlockHash.ToHexString()); try { - IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath); + IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); @@ -2478,8 +2347,8 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, - RemoteProjectStore::Block&& Block) { + auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { IoHash BlockHash = Block.BlockHash; RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); @@ -2512,7 +2381,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Found attachment {}", AttachmentHash); }; - std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock; + std::function<void(CompressedBuffer&&, ChunkBlockDescription &&)> OnBlock; if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; @@ -2522,7 +2391,7 @@ SaveOplog(CidStore& ChunkStore, OnBlock = UploadBlock; } - std::vector<RemoteProjectStore::Block> KnownBlocks; + std::vector<ThinChunkBlockDescription> KnownBlocks; uint64_t TransferWallTimeMS = 0; diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index e05cb9923..1210afc7c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -5,6 +5,8 @@ #include <zencore/jobqueue.h> #include "projectstore.h" +#include <zenutil/chunkblock.h> + #include <unordered_set> namespace zen { @@ -16,14 +18,6 @@ struct ChunkedInfo; class RemoteProjectStore { public: - struct Block - { - IoHash BlockHash; - std::vector<IoHash> ChunkHashes; - std::vector<uint32_t> ChunkLengths; - uint32_t FirstChunkOffset = (uint32_t)-1; - }; - struct Result { int32_t ErrorCode{}; @@ -72,7 +66,7 @@ public: struct GetKnownBlocksResult : public Result { - std::vector<Block> Blocks; + std::vector<ThinChunkBlockDescription> Blocks; }; struct RemoteStoreInfo @@ -101,11 +95,11 @@ public: virtual RemoteStoreInfo GetInfo() const = 0; virtual Stats GetStats() const = 0; - virtual CreateContainerResult CreateContainer() = 0; - virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) = 0; - virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; + virtual CreateContainerResult CreateContainer() = 0; + virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) = 0; + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; virtual LoadContainerResult LoadContainer() = 0; virtual GetKnownBlocksResult GetKnownBlocks() = 0; @@ -125,7 +119,6 @@ struct RemoteStoreOptions }; typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; -typedef std::function<CompositeBuffer(const IoHash& RawHash)> FetchChunkFunc; RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, @@ -137,7 +130,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles); @@ -173,9 +166,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, bool CleanOplog, JobContext* OptionalContext); -CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock); -bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); -std::vector<RemoteProjectStore::Block> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); +std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); } // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 42519b108..2ebf58a5d 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -93,7 +93,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp index 2d59c9357..8a4b977ad 100644 --- a/src/zenserver/workspaces/httpworkspaces.cpp +++ b/src/zenserver/workspaces/httpworkspaces.cpp @@ -51,9 +51,9 @@ namespace { WriteWorkspaceConfig(Writer, WorkspaceConfig); if (std::optional<std::vector<Oid>> ShareIds = Workspaces.GetWorkspaceShares(WorkspaceConfig.Id); ShareIds) { - for (const Oid& ShareId : *ShareIds) + Writer.BeginArray("shares"); { - Writer.BeginArray("shares"); + for (const Oid& ShareId : *ShareIds) { if (std::optional<Workspaces::WorkspaceShareConfiguration> WorkspaceShareConfig = Workspaces.GetWorkspaceShareConfiguration(WorkspaceConfig.Id, ShareId); @@ -66,8 +66,8 @@ namespace { Writer.EndObject(); } } - Writer.EndArray(); } + Writer.EndArray(); } } @@ -589,7 +589,7 @@ void HttpWorkspacesService::ShareAliasFilesRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -608,7 +608,7 @@ void HttpWorkspacesService::ShareAliasChunkInfoRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -635,7 +635,7 @@ void HttpWorkspacesService::ShareAliasBatchRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -654,7 +654,7 @@ void HttpWorkspacesService::ShareAliasEntriesRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -673,7 +673,7 @@ void HttpWorkspacesService::ShareAliasChunkRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -700,7 +700,7 @@ void HttpWorkspacesService::ShareAliasRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, |