aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-28 23:34:00 +0100
committerGitHub Enterprise <[email protected]>2024-11-28 23:34:00 +0100
commitb960d42525ed1239dc32175204171880dc9a5403 (patch)
tree39536dca7f15f943ab4d451946874739f38fe940 /src
parentfix oplog index path reading error (#246) (diff)
downloadzen-b960d42525ed1239dc32175204171880dc9a5403.tar.xz
zen-b960d42525ed1239dc32175204171880dc9a5403.zip
make sure we don't throw exception from worker thread (#247)
* Make sure we don't throw exception from worker thread * secure async project flush * secure workspaces * spelling
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpclient.cpp4
-rw-r--r--src/zenserver/projectstore/projectstore.cpp11
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp751
-rw-r--r--src/zenstore/workspaces.cpp20
4 files changed, 447 insertions, 339 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 87097ca45..9af909fcf 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -387,7 +387,7 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile
uint64_t RawSize;
if (!CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize))
{
- Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validateion");
+ Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation");
return false;
}
}
@@ -400,7 +400,7 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile
IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer);
if (PayloadHash != ExpectedPayloadHash)
{
- Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validateion");
+ Response.error = cpr::Error(/*CURLE_READ_ERROR*/ 26, "Compressed binary failed validation");
return false;
}
}
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 89fbff06c..1296f1269 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3643,9 +3643,16 @@ ProjectStore::Flush()
for (const Ref<Project>& Project : Projects)
{
WorkLatch.AddCount(1);
- WorkerPool.ScheduleWork([&WorkLatch, Project]() {
+ WorkerPool.ScheduleWork([this, &WorkLatch, Project]() {
auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- Project->Flush();
+ try
+ {
+ Project->Flush();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
+ }
});
}
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 970cb19fd..137db255c 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -732,13 +732,13 @@ BuildContainer(CidStore& ChunkStore,
&RemoteResult,
OptionalContext]() {
auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); });
- try
+ if (IsCancelled(OptionalContext))
{
- if (IsCancelled(OptionalContext))
- {
- return;
- }
+ return;
+ }
+ try
+ {
if (!UploadAttachment->RawPath.empty())
{
const std::filesystem::path& FilePath = UploadAttachment->RawPath;
@@ -1590,62 +1590,70 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
return;
}
- bool IsBlock = false;
- IoBuffer Payload;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = BlockIt->second;
- IsBlock = true;
- }
- else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
- {
- Payload = LooseTmpFileIt->second(RawHash);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
- return;
- }
- size_t PayloadSize = Payload.GetSize();
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): {}",
- RawHash,
- NiceBytes(PayloadSize),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
- }
- if (IsBlock)
+ try
{
- Info.AttachmentBlocksUploaded.fetch_add(1);
- Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved block attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
+ bool IsBlock = false;
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = BlockIt->second;
+ IsBlock = true;
+ }
+ else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second(RawHash);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ return;
+ }
+ size_t PayloadSize = Payload.GetSize();
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
+ RawHash,
+ NiceBytes(PayloadSize),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
+ }
+ if (IsBlock)
+ {
+ Info.AttachmentBlocksUploaded.fetch_add(1);
+ Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved block attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
+ else
+ {
+ Info.AttachmentsUploaded.fetch_add(1);
+ Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
+ ZEN_INFO("Saved large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(PayloadSize));
+ }
}
- else
+ catch (const std::exception& Ex)
{
- Info.AttachmentsUploaded.fetch_add(1);
- Info.AttachmentBytesUploaded.fetch_add(PayloadSize);
- ZEN_INFO("Saved large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(PayloadSize));
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("To upload attachment {}", RawHash),
+ Ex.what());
}
- return;
});
}
@@ -1693,44 +1701,57 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
&BulkBlockAttachmentsToUpload,
&Info,
OptionalContext]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- size_t ChunksSize = 0;
- std::vector<SharedBuffer> ChunkBuffers;
- ChunkBuffers.reserve(NeededChunks.size());
- for (const IoHash& Chunk : NeededChunks)
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
{
- auto It = BulkBlockAttachmentsToUpload.find(Chunk);
- ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompositeBuffer ChunkPayload = It->second(It->first);
- if (!ChunkPayload)
+ size_t ChunksSize = 0;
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
{
- RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
- fmt::format("Missing chunk {}"sv, Chunk),
- fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
- ChunkBuffers.clear();
- break;
+ auto It = BulkBlockAttachmentsToUpload.find(Chunk);
+ ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
+ CompositeBuffer ChunkPayload = It->second(It->first);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunksSize += ChunkPayload.GetSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
+ NeededChunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ return;
}
- ChunksSize += ChunkPayload.GetSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
+ Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
+ Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
+
+ ZEN_INFO("Saved {} bulk attachments in {} ({})",
+ NeededChunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
+ NiceBytes(ChunksSize));
}
- RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
- if (Result.ErrorCode)
+ catch (const std::exception& Ex)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): {}",
- NeededChunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- return;
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to buck upload {} attachments", NeededChunks.size()),
+ Ex.what());
}
- Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size());
- Info.AttachmentBytesUploaded.fetch_add(ChunksSize);
-
- ZEN_INFO("Saved {} bulk attachments in {} ({})",
- NeededChunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)),
- NiceBytes(ChunksSize));
});
}
}
@@ -2489,66 +2510,84 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
- if (Result.ErrorCode)
+ try
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to load attachments with {} chunks ({}): {}",
- Chunks.size(),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (IgnoreMissingAttachments)
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
{
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to load attachments with {} chunks ({}): {}",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ }
+ return;
}
- return;
- }
- Info.AttachmentsDownloaded.fetch_add(Chunks.size());
- ZEN_INFO("Loaded {} bulk attachments in {}",
- Chunks.size(),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- if (RemoteResult.IsError())
- {
- return;
- }
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ Info.AttachmentsDownloaded.fetch_add(Chunks.size());
+ ZEN_INFO("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
if (RemoteResult.IsError())
{
return;
}
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ if (!Chunks.empty())
+ {
+ try
+ {
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
+ WriteAttachmentBuffers.reserve(Chunks.size());
+ WriteRawHashes.reserve(Chunks.size());
- if (!Chunks.empty())
- {
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
- WriteAttachmentBuffers.reserve(Chunks.size());
- WriteRawHashes.reserve(Chunks.size());
-
- for (const auto& It : Chunks)
- {
- uint64_t ChunkSize = It.second.GetCompressedSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
- WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(It.first);
- }
- std::vector<CidStore::InsertResult> InsertResults =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+ for (const auto& It : Chunks)
+ {
+ uint64_t ChunkSize = It.second.GetCompressedSize();
+ Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(It.first);
+ }
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
- for (size_t Index = 0; Index < InsertResults.size(); Index++)
- {
- if (InsertResults[Index].New)
- {
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ for (size_t Index = 0; Index < InsertResults.size(); Index++)
+ {
+ if (InsertResults[Index].New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk save {} attachments", Chunks.size()),
+ Ex.what());
+ }
}
- }
- }
- });
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to bulk load {} attachments", Chunks.size()),
+ Ex.what());
+ }
});
return;
}
@@ -2572,122 +2611,142 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
- if (BlockResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
- }
- if (RemoteResult.IsError())
+ try
{
- return;
- }
- uint64_t BlockSize = BlockResult.Bytes.GetSize();
- Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
- Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- BlockHash,
- Chunks = std::move(Chunks),
- Bytes = std::move(BlockResult.Bytes),
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- ZEN_ASSERT(Bytes.Size() > 0);
- std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
- WantedChunks.reserve(Chunks.size());
- WantedChunks.insert(Chunks.begin(), Chunks.end());
- std::vector<IoBuffer> WriteAttachmentBuffers;
- std::vector<IoHash> WriteRawHashes;
-
- IoHash RawHash;
- uint64_t RawSize;
- SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress();
- if (!BlockPayload)
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
{
ReportMessage(OptionalContext,
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
- {});
+ fmt::format("Failed to download block attachment {} ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
return;
}
- if (RawHash != BlockHash)
+ if (RemoteResult.IsError())
{
- ReportMessage(OptionalContext, fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
- {});
return;
}
+ uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ Info.AttachmentBlocksDownloaded.fetch_add(1);
+ ZEN_INFO("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
+ Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ BlockHash,
+ Chunks = std::move(Chunks),
+ Bytes = std::move(BlockResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ ZEN_ASSERT(Bytes.Size() > 0);
+ std::unordered_set<IoHash, IoHash::Hasher> WantedChunks;
+ WantedChunks.reserve(Chunks.size());
+ WantedChunks.insert(Chunks.begin(), Chunks.end());
+ std::vector<IoBuffer> WriteAttachmentBuffers;
+ std::vector<IoHash> WriteRawHashes;
- bool StoreChunksOK = IterateBlock(
- BlockPayload,
- [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
- const IoHash& AttachmentRawHash) {
- if (WantedChunks.contains(AttachmentRawHash))
+ IoHash RawHash;
+ uint64_t RawSize;
+ SharedBuffer BlockPayload = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize).Decompress();
+ if (!BlockPayload)
{
- WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
- IoHash RawHash;
- uint64_t RawSize;
- ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
- ZEN_ASSERT(RawHash == AttachmentRawHash);
- WriteRawHashes.emplace_back(AttachmentRawHash);
- WantedChunks.erase(AttachmentRawHash);
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash));
+ RemoteResult.SetError(
+ gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash),
+ {});
+ return;
+ }
+ if (RawHash != BlockHash)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash),
+ {});
+ return;
}
- });
- if (!StoreChunksOK)
- {
- ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
- fmt::format("Invalid format for block {}", BlockHash),
- {});
- return;
- }
+ bool StoreChunksOK = IterateBlock(
+ BlockPayload,
+ [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
+ const IoHash& AttachmentRawHash) {
+ if (WantedChunks.contains(AttachmentRawHash))
+ {
+ WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer());
+ IoHash RawHash;
+ uint64_t RawSize;
+ ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize));
+ ZEN_ASSERT(RawHash == AttachmentRawHash);
+ WriteRawHashes.emplace_back(AttachmentRawHash);
+ WantedChunks.erase(AttachmentRawHash);
+ }
+ });
- ZEN_ASSERT(WantedChunks.empty());
+ if (!StoreChunksOK)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Block attachment {} has invalid format ({}): {}",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason()));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ return;
+ }
- if (!WriteAttachmentBuffers.empty())
- {
- auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (size_t Index = 0; Index < Results.size(); Index++)
- {
- const auto& Result = Results[Index];
- if (Result.New)
+ ZEN_ASSERT(WantedChunks.empty());
+
+ if (!WriteAttachmentBuffers.empty())
{
- Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ auto Results = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (size_t Index = 0; Index < Results.size(); Index++)
+ {
+ const auto& Result = Results[Index];
+ if (Result.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize());
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
}
}
- }
- });
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed save block attachment {}", BlockHash),
+ Ex.what());
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to block attachment {}", BlockHash),
+ Ex.what());
+ }
});
};
@@ -2734,56 +2793,74 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
- uint64_t Unset = (std::uint64_t)-1;
- DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
+ try
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
{
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ }
+ return;
}
- return;
- }
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
- if (RemoteResult.IsError())
- {
- return;
- }
- Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
-
- AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- RawHash,
- AttachmentSize,
- Bytes = std::move(AttachmentResult.Bytes),
- OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
+ ZEN_INFO("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
+ Info.AttachmentsDownloaded.fetch_add(1);
if (RemoteResult.IsError())
{
return;
}
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
- if (InsertResult.New)
- {
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
- }
- });
+ Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
+
+ AttachmentsWriteLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWriteLatch,
+ &RemoteResult,
+ &Info,
+ &ChunkStore,
+ RawHash,
+ AttachmentSize,
+ Bytes = std::move(AttachmentResult.Bytes),
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ try
+ {
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Saving attachment {} failed", RawHash),
+ Ex.what());
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Loading attachment {} failed", RawHash),
+ Ex.what());
+ }
});
};
@@ -2912,70 +2989,80 @@ LoadOplog(CidStore& ChunkStore,
}
DechunkLatch.CountDown();
});
- if (RemoteResult.IsError())
- {
- return;
- }
- Stopwatch Timer;
- IoBuffer TmpBuffer;
+ try
{
- BasicFile TmpFile;
- TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
+ if (RemoteResult.IsError())
{
- BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
-
- uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
- BLAKE3Stream HashingStream;
- for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ return;
+ }
+ Stopwatch Timer;
+ IoBuffer TmpBuffer;
+ {
+ BasicFile TmpFile;
+ TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate);
{
- const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
- IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
- if (!Chunk)
- {
- ReportMessage(OptionalContext,
- fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ BasicFileWriter TmpWriter(TmpFile, 64u * 1024u);
- // We only add 1 as the resulting missing count will be 1 for the dechunked file
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder();
+ BLAKE3Stream HashingStream;
+ for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
+ {
+ const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
+ if (!Chunk)
{
- RemoteResult.SetError(
- gsl::narrow<int>(HttpResponseCode::NotFound),
- "Missing chunk",
+ ReportMessage(
+ OptionalContext,
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+
+ // We only add 1 as the resulting missing count will be 1 for the dechunked file
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(
+ gsl::narrow<int>(HttpResponseCode::NotFound),
+ "Missing chunk",
+ fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
+ }
+ return;
+ }
+ CompositeBuffer Decompressed =
+ CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
+ for (const SharedBuffer& Segment : Decompressed.GetSegments())
+ {
+ MemoryView SegmentData = Segment.GetView();
+ HashingStream.Append(SegmentData);
+ TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
+ Offset += SegmentData.GetSize();
}
- return;
- }
- CompositeBuffer Decompressed =
- CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite();
- for (const SharedBuffer& Segment : Decompressed.GetSegments())
- {
- MemoryView SegmentData = Segment.GetView();
- HashingStream.Append(SegmentData);
- TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset);
- Offset += SegmentData.GetSize();
}
+ BLAKE3 RawHash = HashingStream.GetHash();
+ ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
+ UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
+ TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
}
- BLAKE3 RawHash = HashingStream.GetHash();
- ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash));
- UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash);
- TmpWriter.Write(Header.GetData(), Header.GetSize(), 0);
+ TmpFile.Close();
+ TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
+ }
+ CidStore::InsertResult InsertResult =
+ ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
+ Info.AttachmentsStored.fetch_add(1);
}
- TmpFile.Close();
- TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName);
+
+ ZEN_INFO("Dechunked attachment {} ({}) in {}",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
- CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
- if (InsertResult.New)
+ catch (const std::exception& Ex)
{
- Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize());
- Info.AttachmentsStored.fetch_add(1);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ fmt::format("Failed to dechunck file {}", Chunked.RawHash),
+ Ex.what());
}
-
- ZEN_INFO("Dechunked attachment {} ({}) in {}",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
}
DechunkLatch.CountDown();
diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp
index d30a27e33..be921552a 100644
--- a/src/zenstore/workspaces.cpp
+++ b/src/zenstore/workspaces.cpp
@@ -302,7 +302,14 @@ namespace {
RootDir = Parent / DirectoryName,
RelativeRoot = RelativeRoot.empty() ? DirectoryName : RelativeRoot / DirectoryName]() {
auto _ = MakeGuard([DataPtr]() { DataPtr->WorkLatch.CountDown(); });
- DataPtr->Traverse(RelativeRoot, RootDir);
+ try
+ {
+ DataPtr->Traverse(RelativeRoot, RootDir);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while traversing path {} {}: {}", RelativeRoot, RootDir, Ex.what());
+ }
});
return false;
}
@@ -632,8 +639,15 @@ Workspaces::GetWorkspaceShareChunks(const Oid& WorkspaceId,
{
WorkLatch.AddCount(1);
WorkerPool.ScheduleWork([&, Index]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
- Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]);
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+ try
+ {
+ Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while fetching chunks, chunk {}: {}", ChunkRequests[Index].ChunkId, Ex.what());
+ }
});
}
WorkLatch.CountDown();