aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/filebuildstorage.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/filebuildstorage.cpp')
-rw-r--r--src/zenutil/filebuildstorage.cpp148
1 files changed, 115 insertions, 33 deletions
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index 47a4e1cc4..c2cc5ab3c 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -35,6 +35,27 @@ public:
virtual ~FileBuildStorage() {}
+ virtual CbObject ListNamespaces(bool bRecursive) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces");
+ ZEN_UNUSED(bRecursive);
+
+ SimulateLatency(0, 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("results");
+ {
+ }
+ Writer.EndArray(); // results
+ Writer.Save();
+ SimulateLatency(Writer.GetSaveSize(), 0);
+ return Writer.Save();
+ }
+
virtual CbObject ListBuilds(CbObject Query) override
{
ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
@@ -66,7 +87,7 @@ public:
}
}
}
- Writer.EndArray(); // builds
+ Writer.EndArray(); // results
Writer.Save();
SimulateLatency(Writer.GetSaveSize(), 0);
return Writer.Save();
@@ -235,7 +256,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
CreateDirectories(BlockPath.parent_path());
TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
@@ -260,7 +281,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
CreateDirectories(BlockPath.parent_path());
@@ -346,7 +367,7 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
BasicFile File(BlockPath, BasicFile::Mode::kRead);
IoBuffer Payload;
@@ -369,11 +390,11 @@ public:
return IoBuffer{};
}
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(
- const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
ZEN_UNUSED(BuildId);
@@ -383,20 +404,22 @@ public:
m_Stats.TotalRequestCount++;
const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
struct WorkloadData
{
- std::atomic<uint64_t> BytesRemaining;
- BasicFile BlobFile;
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
+ std::atomic<uint64_t> BytesRemaining;
+ BasicFile BlobFile;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead);
const uint64_t BlobSize = Workload->BlobFile.FileSize();
- Workload->Receiver = std::move(Receiver);
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
Workload->BytesRemaining = BlobSize;
std::vector<std::function<void()>> WorkItems;
@@ -410,8 +433,12 @@ public:
IoBuffer PartPayload(Size);
Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
m_Stats.TotalBytesRead += PartPayload.GetSize();
+ Workload->OnReceive(Offset, PartPayload);
uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
- Workload->Receiver(Offset, PartPayload, ByteRemaning);
+ if (ByteRemaning == Size)
+ {
+ Workload->OnComplete();
+ }
SimulateLatency(Size, PartPayload.GetSize());
});
@@ -423,7 +450,7 @@ public:
return {};
}
- virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
+ virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
{
ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
ZEN_UNUSED(BuildId);
@@ -440,68 +467,107 @@ public:
m_Stats.TotalBytesWritten += MetaData.GetSize();
WriteAsJson(BlockMetaDataPath, MetaData);
SimulateLatency(0, 0);
+ return true;
}
- virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
{
ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(BuildId), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
+ uint64_t FoundCount = 0;
+
DirectoryContent Content;
GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content);
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
for (const std::filesystem::path& MetaDataFile : Content.Files)
{
IoHash ChunkHash;
if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash))
{
std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash);
- if (std::filesystem::is_regular_file(BlockPath))
+ if (IsFile(BlockPath))
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
+ FoundCount++;
+ if (FoundCount == MaxBlockCount)
+ {
+ break;
+ }
}
}
}
- SimulateLatency(0, sizeof(IoHash) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
- virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
ZEN_UNUSED(BuildId);
- SimulateLatency(0, 0);
+ SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
m_Stats.TotalRequestCount++;
- std::vector<ChunkBlockDescription> Result;
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
+
for (const IoHash& BlockHash : BlockHashes)
{
std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash);
- if (std::filesystem::is_regular_file(MetaDataFile))
+ if (IsFile(MetaDataFile))
{
IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ Writer.AddObject(BlockObject);
}
}
- SimulateLatency(sizeof(BlockHashes) * BlockHashes.size(), sizeof(ChunkBlockDescription) * Result.size());
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
return Result;
}
+ virtual void PutBuildPartStats(const Oid& BuildId,
+ const Oid& BuildPartId,
+ const tsl::robin_map<std::string, double>& FloatStats) override
+ {
+ CbObjectWriter Request;
+ Request.BeginObject("floatStats"sv);
+ for (auto It : FloatStats)
+ {
+ Request.AddFloat(It.first, It.second);
+ }
+ Request.EndObject();
+ CbObject Payload = Request.Save();
+
+ SimulateLatency(Payload.GetSize(), 0);
+
+ const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId);
+ CreateDirectories(BuildPartStatsDataPath.parent_path());
+
+ TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView());
+ WriteAsJson(BuildPartStatsDataPath, Payload);
+
+ SimulateLatency(0, 0);
+ }
+
protected:
std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; }
std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; }
@@ -520,6 +586,11 @@ protected:
return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb";
}
+ std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const
+ {
+ return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid());
+ }
+
std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); }
std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const
@@ -587,7 +658,7 @@ protected:
BuildPartObject.IterateAttachments([&](CbFieldView FieldView) {
const IoHash AttachmentHash = FieldView.AsBinaryAttachment();
const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash);
- if (!std::filesystem::is_regular_file(BlockPath))
+ if (!IsFile(BlockPath))
{
NeededAttachments.push_back(AttachmentHash);
}
@@ -608,13 +679,24 @@ protected:
{
return false;
}
- CompositeBuffer Decompressed = ValidateBuffer.DecompressToComposite();
- if (!Decompressed)
+
+ IoHashStream Hash;
+ bool CouldDecompress = ValidateBuffer.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
+ });
+ if (!CouldDecompress)
{
return false;
}
- IoHash Hash = IoHash::HashBuffer(Decompressed);
- if (Hash != RawHash)
+ if (Hash.GetHash() != VerifyHash)
{
return false;
}