diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-28 23:34:00 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-28 23:34:00 +0100 |
| commit | b960d42525ed1239dc32175204171880dc9a5403 (patch) | |
| tree | 39536dca7f15f943ab4d451946874739f38fe940 /src | |
| parent | fix oplog index path reading error (#246) (diff) | |
| download | zen-b960d42525ed1239dc32175204171880dc9a5403.tar.xz zen-b960d42525ed1239dc32175204171880dc9a5403.zip | |
make sure we don't throw exception from worker thread (#247)
* Make sure we don't throw exception from worker thread
* secure async project flush
* secure workspaces
* spelling
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 11 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 751 | ||||
| -rw-r--r-- | src/zenstore/workspaces.cpp | 20 |
4 files changed, 447 insertions, 339 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 87097ca45..9af909fcf 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -387,7 +387,7 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile uint64_t RawSize; if (!CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize)) { - Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validateion"); + Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation"); return false; } } @@ -400,7 +400,7 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer); if (PayloadHash != ExpectedPayloadHash) { - Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validateion"); + Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation"); return false; } } diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 89fbff06c..1296f1269 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3643,9 +3643,16 @@ ProjectStore::Flush() for (const Ref<Project>& Project : Projects) { WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([&WorkLatch, Project]() { + WorkerPool.ScheduleWork([this, &WorkLatch, Project]() { auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - Project->Flush(); + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } }); } diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 970cb19fd..137db255c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -732,13 +732,13 @@ BuildContainer(CidStore& ChunkStore, &RemoteResult, OptionalContext]() { auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); - try + if (IsCancelled(OptionalContext)) { - if (IsCancelled(OptionalContext)) - { - return; - } + return; + } + try + { if (!UploadAttachment->RawPath.empty()) { const std::filesystem::path& FilePath = UploadAttachment->RawPath; @@ -1590,62 +1590,70 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { return; } - bool IsBlock = false; - IoBuffer Payload; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = BlockIt->second; - IsBlock = true; - } - else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) - { - Payload = LooseTmpFileIt->second(RawHash); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); - return; - } - size_t PayloadSize = Payload.GetSize(); - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): {}", - RawHash, - NiceBytes(PayloadSize), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; - } - if (IsBlock) + try { - Info.AttachmentBlocksUploaded.fetch_add(1); - Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved block attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); + bool IsBlock = false; + IoBuffer Payload; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = BlockIt->second; + IsBlock = true; + } + else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) + { + Payload = LooseTmpFileIt->second(RawHash); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); + return; + } + size_t PayloadSize = Payload.GetSize(); + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachment '{}', {} ({}): {}", + RawHash, + NiceBytes(PayloadSize), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; + } + if (IsBlock) + { + Info.AttachmentBlocksUploaded.fetch_add(1); + Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved block attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } + else + { + Info.AttachmentsUploaded.fetch_add(1); + Info.AttachmentBytesUploaded.fetch_add(PayloadSize); + ZEN_INFO("Saved large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(PayloadSize)); + } } - else + catch (const std::exception& Ex) { - Info.AttachmentsUploaded.fetch_add(1); - Info.AttachmentBytesUploaded.fetch_add(PayloadSize); - ZEN_INFO("Saved large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(PayloadSize)); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("To upload attachment {}", RawHash), + Ex.what()); } - return; }); } @@ -1693,44 +1701,57 @@ UploadAttachments(WorkerThreadPool& WorkerPool, &BulkBlockAttachmentsToUpload, &Info, OptionalContext]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - size_t ChunksSize = 0; - std::vector<SharedBuffer> ChunkBuffers; - ChunkBuffers.reserve(NeededChunks.size()); - for (const IoHash& Chunk : NeededChunks) + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try { - auto It = BulkBlockAttachmentsToUpload.find(Chunk); - ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompositeBuffer ChunkPayload = It->second(It->first); - if (!ChunkPayload) + size_t ChunksSize = 0; + std::vector<SharedBuffer> ChunkBuffers; + ChunkBuffers.reserve(NeededChunks.size()); + for (const IoHash& Chunk : NeededChunks) { - RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), - fmt::format("Missing chunk {}"sv, Chunk), - fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); - ChunkBuffers.clear(); - break; + auto It = BulkBlockAttachmentsToUpload.find(Chunk); + ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); + CompositeBuffer ChunkPayload = It->second(It->first); + if (!ChunkPayload) + { + RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), + fmt::format("Missing chunk {}"sv, Chunk), + fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); + ChunkBuffers.clear(); + break; + } + ChunksSize += ChunkPayload.GetSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); + } + RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to save attachments with {} chunks ({}): {}", + NeededChunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + return; } - ChunksSize += ChunkPayload.GetSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); + Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); + Info.AttachmentBytesUploaded.fetch_add(ChunksSize); + + ZEN_INFO("Saved {} bulk attachments in {} ({})", + NeededChunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), + NiceBytes(ChunksSize)); } - RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); - if (Result.ErrorCode) + catch (const std::exception& Ex) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): {}", - NeededChunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - return; + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to buck upload {} attachments", NeededChunks.size()), + Ex.what()); } - Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); - Info.AttachmentBytesUploaded.fetch_add(ChunksSize); - - ZEN_INFO("Saved {} bulk attachments in {} ({})", - NeededChunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)), - NiceBytes(ChunksSize)); }); } } @@ -2489,66 +2510,84 @@ LoadOplog(CidStore& ChunkStore, { return; } - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); - if (Result.ErrorCode) + try { - ReportMessage(OptionalContext, - fmt::format("Failed to load attachments with {} chunks ({}): {}", - Chunks.size(), - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (IgnoreMissingAttachments) + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + if (Result.ErrorCode) { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to load attachments with {} chunks ({}): {}", + Chunks.size(), + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (IgnoreMissingAttachments) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + } + return; } - return; - } - Info.AttachmentsDownloaded.fetch_add(Chunks.size()); - ZEN_INFO("Loaded {} bulk attachments in {}", - Chunks.size(), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - if (RemoteResult.IsError()) - { - return; - } - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + Info.AttachmentsDownloaded.fetch_add(Chunks.size()); + ZEN_INFO("Loaded {} bulk attachments in {}", + Chunks.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); if (RemoteResult.IsError()) { return; } + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + if (!Chunks.empty()) + { + try + { + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; + WriteAttachmentBuffers.reserve(Chunks.size()); + WriteRawHashes.reserve(Chunks.size()); - if (!Chunks.empty()) - { - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - WriteAttachmentBuffers.reserve(Chunks.size()); - WriteRawHashes.reserve(Chunks.size()); - - for (const auto& It : Chunks) - { - uint64_t ChunkSize = It.second.GetCompressedSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); - WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(It.first); - } - std::vector<CidStore::InsertResult> InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + for (const auto& It : Chunks) + { + uint64_t ChunkSize = It.second.GetCompressedSize(); + Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(It.first); + } + std::vector<CidStore::InsertResult> InsertResults = + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); - for (size_t Index = 0; Index < InsertResults.size(); Index++) - { - if (InsertResults[Index].New) - { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); + for (size_t Index = 0; Index < InsertResults.size(); Index++) + { + if (InsertResults[Index].New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk save {} attachments", Chunks.size()), + Ex.what()); + } } - } - } - }); + }); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to bulk load {} attachments", Chunks.size()), + Ex.what()); + } }); return; } @@ -2572,122 +2611,142 @@ LoadOplog(CidStore& ChunkStore, { return; } - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) - { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; - } - if (RemoteResult.IsError()) + try { - return; - } - uint64_t BlockSize = BlockResult.Bytes.GetSize(); - Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); - Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - BlockHash, - Chunks = std::move(Chunks), - Bytes = std::move(BlockResult.Bytes), - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - ZEN_ASSERT(Bytes.Size() > 0); - std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; - WantedChunks.reserve(Chunks.size()); - WantedChunks.insert(Chunks.begin(), Chunks.end()); - std::vector<IoBuffer> WriteAttachmentBuffers; - std::vector<IoHash> WriteRawHashes; - - IoHash RawHash; - uint64_t RawSize; - SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress(); - if (!BlockPayload) + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) { ReportMessage(OptionalContext, - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), - {}); + fmt::format("Failed to download block attachment {} ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } return; } - if (RawHash != BlockHash) + if (RemoteResult.IsError()) { - ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), - {}); return; } + uint64_t BlockSize = BlockResult.Bytes.GetSize(); + Info.AttachmentBlocksDownloaded.fetch_add(1); + ZEN_INFO("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); + Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + BlockHash, + Chunks = std::move(Chunks), + Bytes = std::move(BlockResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + ZEN_ASSERT(Bytes.Size() > 0); + std::unordered_set<IoHash, IoHash::Hasher> WantedChunks; + WantedChunks.reserve(Chunks.size()); + WantedChunks.insert(Chunks.begin(), Chunks.end()); + std::vector<IoBuffer> WriteAttachmentBuffers; + std::vector<IoHash> WriteRawHashes; - bool StoreChunksOK = IterateBlock( - BlockPayload, - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) + IoHash RawHash; + uint64_t RawSize; + SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress(); + if (!BlockPayload) { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); + ReportMessage(OptionalContext, + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash)); + RemoteResult.SetError( + gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash), + {}); + return; + } + if (RawHash != BlockHash) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash)); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash), + {}); + return; } - }); - if (!StoreChunksOK) - { - ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), - fmt::format("Invalid format for block {}", BlockHash), - {}); - return; - } + bool StoreChunksOK = IterateBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }); - ZEN_ASSERT(WantedChunks.empty()); + if (!StoreChunksOK) + { + ReportMessage(OptionalContext, + fmt::format("Block attachment {} has invalid format ({}): {}", + BlockHash, + RemoteResult.GetError(), + RemoteResult.GetErrorReason())); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Invalid format for block {}", BlockHash), + {}); + return; + } - if (!WriteAttachmentBuffers.empty()) - { - auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (size_t Index = 0; Index < Results.size(); Index++) - { - const auto& Result = Results[Index]; - if (Result.New) + ZEN_ASSERT(WantedChunks.empty()); + + if (!WriteAttachmentBuffers.empty()) { - Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); - Info.AttachmentsStored.fetch_add(1); + auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (size_t Index = 0; Index < Results.size(); Index++) + { + const auto& Result = Results[Index]; + if (Result.New) + { + Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); + Info.AttachmentsStored.fetch_add(1); + } + } } } - } - }); + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed save block attachment {}", BlockHash), + Ex.what()); + } + }); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to block attachment {}", BlockHash), + Ex.what()); + } }); }; @@ -2734,56 +2793,74 @@ LoadOplog(CidStore& ChunkStore, { return; } - uint64_t Unset = (std::uint64_t)-1; - DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) + try { - ReportMessage(OptionalContext, - fmt::format("Failed to download large attachment {}: '{}', error code : {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + uint64_t Unset = (std::uint64_t)-1; + DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); + RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ReportMessage(OptionalContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } + return; } - return; - } - uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_INFO("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); - Info.AttachmentsDownloaded.fetch_add(1); - if (RemoteResult.IsError()) - { - return; - } - Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); - - AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork([&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - RawHash, - AttachmentSize, - Bytes = std::move(AttachmentResult.Bytes), - OptionalContext]() { - auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); + ZEN_INFO("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); + Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { return; } - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); - if (InsertResult.New) - { - Info.AttachmentBytesStored.fetch_add(AttachmentSize); - Info.AttachmentsStored.fetch_add(1); - } - }); + Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); + + AttachmentsWriteLatch.AddCount(1); + WorkerPool.ScheduleWork([&AttachmentsWriteLatch, + &RemoteResult, + &Info, + &ChunkStore, + RawHash, + AttachmentSize, + Bytes = std::move(AttachmentResult.Bytes), + OptionalContext]() { + auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + try + { + CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(AttachmentSize); + Info.AttachmentsStored.fetch_add(1); + } + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Saving attachment {} failed", RawHash), + Ex.what()); + } + }); + } + catch (const std::exception& Ex) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Loading attachment {} failed", RawHash), + Ex.what()); + } }); }; @@ -2912,70 +2989,80 @@ LoadOplog(CidStore& ChunkStore, } DechunkLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } - Stopwatch Timer; - IoBuffer TmpBuffer; + try { - BasicFile TmpFile; - TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); + if (RemoteResult.IsError()) { - BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - - uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); - BLAKE3Stream HashingStream; - for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + return; + } + Stopwatch Timer; + IoBuffer TmpBuffer; + { + BasicFile TmpFile; + TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); { - const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; - IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); - if (!Chunk) - { - ReportMessage(OptionalContext, - fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); - // We only add 1 as the resulting missing count will be 1 for the dechunked file - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); + BLAKE3Stream HashingStream; + for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) + { + const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; + IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); + if (!Chunk) { - RemoteResult.SetError( - gsl::narrow<int>(HttpResponseCode::NotFound), - "Missing chunk", + ReportMessage( + OptionalContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + + // We only add 1 as the resulting missing count will be 1 for the dechunked file + Info.MissingAttachmentCount.fetch_add(1); + if (!IgnoreMissingAttachments) + { + RemoteResult.SetError( + gsl::narrow<int>(HttpResponseCode::NotFound), + "Missing chunk", + fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); + } + return; + } + CompositeBuffer Decompressed = + CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); + for (const SharedBuffer& Segment : Decompressed.GetSegments()) + { + MemoryView SegmentData = Segment.GetView(); + HashingStream.Append(SegmentData); + TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); + Offset += SegmentData.GetSize(); } - return; - } - CompositeBuffer Decompressed = - CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); - for (const SharedBuffer& Segment : Decompressed.GetSegments()) - { - MemoryView SegmentData = Segment.GetView(); - HashingStream.Append(SegmentData); - TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); - Offset += SegmentData.GetSize(); } + BLAKE3 RawHash = HashingStream.GetHash(); + ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); + UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); + TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); } - BLAKE3 RawHash = HashingStream.GetHash(); - ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); - UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); - TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); + TmpFile.Close(); + TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); + } + CidStore::InsertResult InsertResult = + ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + if (InsertResult.New) + { + Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); + Info.AttachmentsStored.fetch_add(1); } - TmpFile.Close(); - TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); + + ZEN_INFO("Dechunked attachment {} ({}) in {}", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } - CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); - if (InsertResult.New) + catch (const std::exception& Ex) { - Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); - Info.AttachmentsStored.fetch_add(1); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), + fmt::format("Failed to dechunck file {}", Chunked.RawHash), + Ex.what()); } - - ZEN_INFO("Dechunked attachment {} ({}) in {}", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); } DechunkLatch.CountDown(); diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp index d30a27e33..be921552a 100644 --- a/src/zenstore/workspaces.cpp +++ b/src/zenstore/workspaces.cpp @@ -302,7 +302,14 @@ namespace { RootDir = Parent / DirectoryName, RelativeRoot = RelativeRoot.empty() ? DirectoryName : RelativeRoot / DirectoryName]() { auto _ = MakeGuard([DataPtr]() { DataPtr->WorkLatch.CountDown(); }); - DataPtr->Traverse(RelativeRoot, RootDir); + try + { + DataPtr->Traverse(RelativeRoot, RootDir); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while traversing path {} {}: {}", RelativeRoot, RootDir, Ex.what()); + } }); return false; } @@ -632,8 +639,15 @@ Workspaces::GetWorkspaceShareChunks(const Oid& WorkspaceId, { WorkLatch.AddCount(1); WorkerPool.ScheduleWork([&, Index]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]); + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + try + { + Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what()); + } }); } WorkLatch.CountDown(); |