aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-15 09:09:46 +0200
committerGitHub Enterprise <[email protected]>2024-04-15 09:09:46 +0200
commit6ff0ea81285ffbe2e5708948d7345eb7af9f22f3 (patch)
treedd8ef30c0e8f17e6160c0eef185a796059065f71
parentValidate input buffer size when trying to parse package message (#47) (diff)
downloadzen-6ff0ea81285ffbe2e5708948d7345eb7af9f22f3.tar.xz
zen-6ff0ea81285ffbe2e5708948d7345eb7af9f22f3.zip
gc v2 disk freed space fix and oplog stats report improvement (#45)
- Bugfix: Correctly calculate size freed/data moved from blocks in GCv2 - Improvement: Reduced details in remote store stats for oplog export/import to user - Improvement: Transfer speed for oplog export/import is now an overall number rather than average of speed per single request
-rw-r--r--CHANGELOG.md3
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp208
-rw-r--r--src/zenstore/blockstore.cpp25
3 files changed, 149 insertions, 87 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 82a9fad26..f51998143 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
##
+- Bugfix: Correctly calculate size freed/data moved from blocks in GCv2
+- Improvement: Reduced details in remote store stats for oplog export/import to user
+- Improvement: Transfer speed for oplog export/import is now an overall number rather than average of speed per single request
- Improvement: add validation of input buffer size when trying to parse package message
- Improvement: avoid doing memcopy when parsing package message
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index ba3ff6499..cc9385f5e 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -116,29 +116,31 @@ IsCancelled(JobContext* OptionalContext)
void
ReportRemoteStoreStats(JobContext* OptionalContext,
const RemoteProjectStore::RemoteStoreInfo& RemoteStoreInfo,
- const RemoteProjectStore::Stats& Stats)
+ const RemoteProjectStore::Stats& Stats,
+ uint64_t ElapsedWallTimeMS)
{
ReportMessage(
OptionalContext,
- fmt::format(
- "Remote store stats for '{}'\n"
- "Requests: {} (avg {})\n"
- "Sent: {} (avg {}, {}/s)\n"
- "Received: {} (avg {}, {}/s)\n"
- "Peak: Send {}, Receive {} ({}/s)",
- RemoteStoreInfo.Description,
- Stats.m_RequestCount,
- NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u),
- NiceBytes(Stats.m_SentBytes),
- NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u),
- NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u),
- NiceBytes(Stats.m_ReceivedBytes),
- NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u),
- NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000000000) / Stats.m_RequestTimeNS)
- : 0u),
- NiceBytes(Stats.m_PeakSentBytes),
- NiceBytes(Stats.m_PeakReceivedBytes),
- NiceBytes(Stats.m_PeakBytesPerSec)));
+ fmt::format("Remote store '{}': "
+ "Sent: {} ({}/s) Recv: {} ({}/s)",
+ RemoteStoreInfo.Description,
+ NiceBytes(Stats.m_SentBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000) / ElapsedWallTimeMS) : 0u),
+ NiceBytes(Stats.m_ReceivedBytes),
+ NiceBytes(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000) / ElapsedWallTimeMS) : 0u)));
+ ZEN_INFO("Oplog request count: {}. Average request size: {}. Peak request speed: {}",
+ Stats.m_RequestCount,
+ NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u),
+ NiceBytes(Stats.m_PeakBytesPerSec));
+ ZEN_INFO("Oplog sent request avg: {} ({}/s). Peak: {}",
+ NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u),
+ NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u),
+ NiceBytes(Stats.m_PeakSentBytes));
+ ZEN_INFO(
+ "Oplog recv request avg: {} ({}/s). Peak: {}",
+ NiceBytes(Stats.m_RequestCount > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u),
+ NiceBytes(Stats.m_RequestTimeNS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 1000000000) / Stats.m_RequestTimeNS) : 0u),
+ NiceBytes(Stats.m_PeakReceivedBytes));
}
bool
@@ -1824,10 +1826,15 @@ SaveOplog(CidStore& ChunkStore,
std::vector<Block> KnownBlocks;
+ uint64_t TransferWallTimeMS = 0;
+
if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty())
{
ReportMessage(OptionalContext, fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName));
+ Stopwatch LoadBaseContainerTimer;
RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer();
+ TransferWallTimeMS += LoadBaseContainerTimer.GetElapsedTimeMs();
+
if (BaseContainerResult.ErrorCode != static_cast<int>(HttpResponseCode::NoContent))
{
if (BaseContainerResult.ErrorCode)
@@ -1943,8 +1950,9 @@ SaveOplog(CidStore& ChunkStore,
RemoteStoreInfo.ContainerName,
ChunkCount,
BlockCount));
-
+ Stopwatch SaveContainerTimer;
RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
+ TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs();
if (ContainerSaveResult.ErrorCode)
{
RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
@@ -1959,18 +1967,22 @@ SaveOplog(CidStore& ChunkStore,
NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000.0))));
}
- UploadAttachments(NetworkWorkerPool,
- ChunkStore,
- RemoteStore,
- LargeAttachments,
- BlockChunks,
- CreatedBlocks,
- LooseLargeFiles,
- ContainerSaveResult.Needs,
- ForceUpload,
- Info,
- RemoteResult,
- OptionalContext);
+ {
+ Stopwatch UploadAttachmentsTimer;
+ UploadAttachments(NetworkWorkerPool,
+ ChunkStore,
+ RemoteStore,
+ LargeAttachments,
+ BlockChunks,
+ CreatedBlocks,
+ LooseLargeFiles,
+ ContainerSaveResult.Needs,
+ ForceUpload,
+ Info,
+ RemoteResult,
+ OptionalContext);
+ TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs();
+ }
uint32_t Try = 0;
while (!RemoteResult.IsError())
@@ -2027,6 +2039,7 @@ SaveOplog(CidStore& ChunkStore,
ContainerFinalizeResult.Needs.size(),
Try));
+ Stopwatch UploadAttachmentsTimer;
UploadAttachments(NetworkWorkerPool,
ChunkStore,
RemoteStore,
@@ -2039,6 +2052,7 @@ SaveOplog(CidStore& ChunkStore,
Info,
RemoteResult,
OptionalContext);
+ TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs();
}
else
{
@@ -2063,6 +2077,8 @@ SaveOplog(CidStore& ChunkStore,
RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);
+
ReportMessage(OptionalContext,
fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})",
RemoteStoreInfo.ContainerName,
@@ -2074,8 +2090,6 @@ SaveOplog(CidStore& ChunkStore,
Info.AttachmentsUploaded.load(),
NiceBytes(Info.AttachmentBytesUploaded.load())));
- ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats());
-
return Result;
};
@@ -2279,11 +2293,16 @@ LoadOplog(CidStore& ChunkStore,
WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool();
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+ uint64_t BlockCountToDownload = 0;
RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
+ uint64_t TransferWallTimeMS = 0;
+
+ Stopwatch LoadContainerTimer;
RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
+ TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs();
if (LoadContainerResult.ErrorCode)
{
ReportMessage(
@@ -2304,6 +2323,9 @@ LoadOplog(CidStore& ChunkStore,
Latch AttachmentsWorkLatch(1);
std::atomic_size_t AttachmentCount = 0;
+ Stopwatch LoadAttachmentsTimer;
+ std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1;
+
auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
if (ForceDownload)
{
@@ -2321,13 +2343,17 @@ LoadOplog(CidStore& ChunkStore,
&AttachmentsWorkLatch,
&AttachmentCount,
&RemoteResult,
+ &BlockCountToDownload,
&Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext](const IoHash& BlockHash, std::vector<IoHash>&& Chunks) {
if (RemoteResult.IsError())
{
return;
}
+ BlockCountToDownload++;
if (BlockHash == IoHash::Zero)
{
AttachmentsWorkLatch.AddCount(1);
@@ -2338,6 +2364,8 @@ LoadOplog(CidStore& ChunkStore,
&RemoteResult,
Chunks = std::move(Chunks),
&Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
@@ -2345,6 +2373,8 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
if (Result.ErrorCode)
{
@@ -2392,6 +2422,8 @@ LoadOplog(CidStore& ChunkStore,
&RemoteResult,
Chunks = std::move(Chunks),
&Info,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
IgnoreMissingAttachments,
OptionalContext]() {
auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
@@ -2399,6 +2431,8 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
if (BlockResult.ErrorCode)
{
@@ -2467,6 +2501,8 @@ LoadOplog(CidStore& ChunkStore,
&RemoteResult,
&Attachments,
&AttachmentCount,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
&Info,
IgnoreMissingAttachments,
OptionalContext](const IoHash& RawHash) {
@@ -2481,46 +2517,56 @@ LoadOplog(CidStore& ChunkStore,
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
- NetworkWorkerPool.ScheduleWork(
- [&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() {
- auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
- {
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
- }
- return;
- }
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_INFO("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
- if (RemoteResult.IsError())
- {
- return;
- }
- Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
- if (InsertResult.New)
+ NetworkWorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &RemoteResult,
+ &AttachmentsWorkLatch,
+ RawHash,
+ &LoadAttachmentsTimer,
+ &DownloadStartMS,
+ &Info,
+ IgnoreMissingAttachments,
+ OptionalContext]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ uint64_t Unset = (std::uint64_t)-1;
+ DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ ReportMessage(OptionalContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!IgnoreMissingAttachments)
{
- Info.AttachmentBytesStored.fetch_add(AttachmentSize);
- Info.AttachmentsStored.fetch_add(1);
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
}
- });
+ return;
+ }
+ uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
+ ZEN_INFO("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
+ Info.AttachmentsDownloaded.fetch_add(1);
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
+ CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ if (InsertResult.New)
+ {
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
+ Info.AttachmentsStored.fetch_add(1);
+ }
+ });
};
std::vector<ChunkedInfo> FilesToDechunk;
@@ -2544,9 +2590,11 @@ LoadOplog(CidStore& ChunkStore,
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
ReportMessage(OptionalContext,
- fmt::format("Wrote oplog in {}, found {} attachments to download",
+ fmt::format("Wrote oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- Attachments.size()));
+ Attachments.size(),
+ BlockCountToDownload,
+ FilesToDechunk.size()));
AttachmentsWorkLatch.CountDown();
while (!AttachmentsWorkLatch.Wait(1000))
@@ -2565,6 +2613,12 @@ LoadOplog(CidStore& ChunkStore,
{
ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0);
}
+
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
+ }
+
if (Result.ErrorCode == 0)
{
if (!FilesToDechunk.empty())
@@ -2689,6 +2743,8 @@ LoadOplog(CidStore& ChunkStore,
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);
+
ReportMessage(OptionalContext,
fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}",
RemoteStoreInfo.ContainerName,
@@ -2703,8 +2759,6 @@ LoadOplog(CidStore& ChunkStore,
NiceBytes(Info.AttachmentBytesStored.load()),
Info.MissingAttachmentCount.load()));
- ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats());
-
return Result;
}
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index a576ff022..644ddf7b4 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1093,7 +1093,6 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
NiceBytes(MovedSize));
});
- uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block
uint32_t NewBlockIndex = 0;
MovedChunksArray MovedChunks;
@@ -1188,8 +1187,11 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
std::sort(SortedChunkIndexes.begin(), SortedChunkIndexes.end(), [&ChunkLocations](size_t Lhs, size_t Rhs) {
return ChunkLocations[Lhs].Offset < ChunkLocations[Rhs].Offset;
});
- BasicFileBuffer SourceFileBuffer(OldBlockFile->GetBasicFile(), Min(65536u, OldBlockSize));
- uint64_t MovedFromBlock = 0;
+ BasicFileBuffer SourceFileBuffer(OldBlockFile->GetBasicFile(), Min(65536u, OldBlockSize));
+
+ uint64_t WriteOffset = m_MaxBlockSize + 1u; // Force detect a new block
+ uint64_t WrittenBytesToBlock = 0;
+ uint64_t MovedFromBlock = 0;
std::vector<uint8_t> Chunk;
for (const size_t& ChunkIndex : SortedChunkIndexes)
{
@@ -1295,19 +1297,22 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
NiceBytes(Space.Free + ReclaimedSpace));
}
NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
- TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(65536u, m_MaxBlockSize));
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
+ AddedSize += WrittenBytesToBlock;
+ WrittenBytesToBlock = 0;
+ TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(65536u, m_MaxBlockSize));
}
TargetFileBuffer->Write(Chunk.data(), ChunkLocation.Size, WriteOffset);
MovedChunks.push_back(
{ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}});
- uint64_t WriteEndOffset = WriteOffset + ChunkLocation.Size;
- MovedFromBlock += (WriteEndOffset - WriteOffset);
- WriteOffset = RoundUp(WriteEndOffset, PayloadAlignment);
- AddedSize += Chunk.size();
+ WrittenBytesToBlock = WriteOffset + ChunkLocation.Size;
+
+ MovedFromBlock += RoundUp(ChunkLocation.Offset + ChunkLocation.Size, PayloadAlignment) - ChunkLocation.Offset;
+ WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment);
}
+ AddedSize += WrittenBytesToBlock;
ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}",
LogPrefix,
KeepChunkIndexes.size(),