diff options
| author | Joe Kirchoff <[email protected]> | 2022-05-27 10:38:36 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-05-27 10:38:36 -0700 |
| commit | 7f790cc5e68d498b3d857a13e40a2c62fca87a6c (patch) | |
| tree | 4d3716561ce7ab542313f037d037374c9601a24c | |
| parent | Merge pull request #110 from EpicGames/de/out-of-disk-space-error-handling (diff) | |
| download | zen-7f790cc5e68d498b3d857a13e40a2c62fca87a6c.tar.xz zen-7f790cc5e68d498b3d857a13e40a2c62fca87a6c.zip | |
Horde execute compressed input blobs (#109)
| -rw-r--r-- | zenserver/upstream/hordecompute.cpp | 163 |
1 files changed, 123 insertions, 40 deletions
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index 2ec24b303..38c798569 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -157,8 +157,13 @@ namespace detail { { ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks(); - std::scoped_lock Lock(m_TaskMutex); - if (m_PendingTasks.contains(UpstreamData.TaskId)) + + bool AlreadyQueued; + { + std::scoped_lock Lock(m_TaskMutex); + AlreadyQueued = m_PendingTasks.contains(UpstreamData.TaskId); + } + if (AlreadyQueued) { // Pending task is already queued, return success ApplyResult.Success = true; @@ -171,7 +176,7 @@ namespace detail { CloudCacheSession StorageSession(m_StorageClient); { - CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); + CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs, UpstreamData.CasIds); ApplyResult.Bytes += Result.Bytes; ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks(); @@ -182,6 +187,22 @@ namespace detail { return ApplyResult; } UpstreamData.Blobs.clear(); + UpstreamData.CasIds.clear(); + } + + { + CloudCacheResult Result = BatchPutCompressedBlobsIfMissing(StorageSession, UpstreamData.Cids); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-upload-compressed-blobs"] = DateTime::NowTicks(); + if (!Result.Success) + { + ApplyResult.Error = { + .ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload compressed blobs"}; + return ApplyResult; + } + UpstreamData.Cids.clear(); } { @@ -279,9 +300,11 @@ namespace detail { } } - [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs) + [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, + const std::map<IoHash, IoBuffer>& Blobs, + const std::set<IoHash>& CasIds) { - if (Blobs.size() == 0) + if (Blobs.size() == 0 && CasIds.size() == 0) { return {.Success = true}; } @@ -292,6 +315,7 @@ namespace detail { // Batch check for missing blobs std::set<IoHash> Keys; std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); + Keys.insert(CasIds.begin(), CasIds.end()); CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys); Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", @@ -310,7 +334,22 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash)); + IoBuffer DataBuffer; + if (Blobs.contains(Hash)) + { + DataBuffer = Blobs.at(Hash); + } + else + { + DataBuffer = m_CasStore.FindChunk(Hash); + if (!DataBuffer) + { + Log().warn("Put blob FAILED, input chunk '{}' missing", Hash); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put blobs"}; + } + } + + CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -326,6 +365,62 @@ namespace detail { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } + [[nodiscard]] CloudCacheResult BatchPutCompressedBlobsIfMissing(CloudCacheSession& Session, const std::set<IoHash>& Cids) + { + if (Cids.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing compressed blobs + CloudCacheExistsResult ExistsResult = Session.CompressedBlobExists(Session.Client().DefaultBlobStoreNamespace(), Cids); + Log().debug("Queried {} missing compressed blobs Need={} Duration={}s Result={}", + Cids.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (!ExistsResult.Success) + { + return { + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if compressed blobs exist"}; + } + + for (const auto& Hash : ExistsResult.Needs) + { + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Hash); + if (!DataBuffer) + { + Log().warn("Put compressed blob FAILED, input CID chunk '{}' missing", Hash); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put compressed blobs"}; + } + + CloudCacheResult Result = Session.PutCompressedBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); + Log().debug("Put compressed blob {} Bytes={} Duration={}s Result={}", + Hash, + Result.Bytes, + Result.ElapsedSeconds, + Result.Success); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put compressed blobs"}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) { if (Objects.size() == 0) @@ -599,6 +694,8 @@ namespace detail { { std::map<IoHash, IoBuffer> Blobs; std::map<IoHash, CbObject> Objects; + std::set<IoHash> CasIds; + std::set<IoHash> Cids; IoHash TaskId; IoHash RequirementsId; }; @@ -957,7 +1054,7 @@ namespace detail { for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) { CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) { return false; } @@ -966,7 +1063,7 @@ namespace detail { for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) { CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) { return false; } @@ -1034,11 +1131,10 @@ namespace detail { bool AnyErrors = false; ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; - IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; - if (!DataBuffer) + if (!m_CidStore.ContainsChunk(Cid)) { Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); AnyErrors = true; @@ -1050,11 +1146,9 @@ namespace detail { return; } - const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); - InputFiles.insert(FilePath); - InputFileHashes[FilePath] = CompressedId; - Data.Blobs[CompressedId] = std::move(DataBuffer); + InputFileHashes[FilePath] = Cid; + Data.Cids.insert(Cid); }); if (AnyErrors) @@ -1067,7 +1161,7 @@ namespace detail { const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); - CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); + CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Cids, Data.Objects); const IoHash SandboxHash = Sandbox.GetHash(); Data.Objects[SandboxHash] = std::move(Sandbox); @@ -1118,28 +1212,18 @@ namespace detail { [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, std::set<std::filesystem::path>& InputFiles, std::map<std::filesystem::path, IoHash>& InputFileHashes, - std::map<IoHash, IoBuffer>& Blobs) + std::set<IoHash>& CasIds) { - const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); - const IoHash ChunkId = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); - IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); + const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); + const IoHash ChunkId = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); - if (!DataBuffer) + if (!m_CasStore.ContainsChunk(ChunkId)) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); return false; } - if (DataBuffer.Size() != Size) - { - Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", - ChunkId, - DataBuffer.Size(), - Size); - return false; - } - if (InputFiles.contains(FilePath)) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); @@ -1148,7 +1232,7 @@ namespace detail { InputFiles.insert(FilePath); InputFileHashes[FilePath] = ChunkId; - Blobs[ChunkId] = std::move(DataBuffer); + CasIds.insert(ChunkId); return true; } @@ -1204,7 +1288,7 @@ namespace detail { [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, const std::map<std::filesystem::path, IoHash>& InputFileHashes, - const std::map<IoHash, IoBuffer>& Blobs, + const std::set<IoHash>& Cids, std::map<IoHash, CbObject>& Objects) { CbObjectWriter DirectoryTreeWriter; @@ -1214,14 +1298,13 @@ namespace detail { DirectoryTreeWriter.BeginArray("f"sv); for (const auto& File : RootDirectory.Files) { - const std::filesystem::path FilePath = {RootDirectory.Path / File}; - const IoHash& FileHash = InputFileHashes.at(FilePath); - const uint64_t FileSize = Blobs.at(FileHash).Size(); + const std::filesystem::path FilePath = {RootDirectory.Path / File}; + const IoHash& FileHash = InputFileHashes.at(FilePath); + const bool Compressed = Cids.contains(FileHash); DirectoryTreeWriter.BeginObject(); DirectoryTreeWriter.AddString("n"sv, File); DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); - DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size - // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded + DirectoryTreeWriter.AddBool("c"sv, Compressed); DirectoryTreeWriter.EndObject(); } DirectoryTreeWriter.EndArray(); @@ -1232,7 +1315,7 @@ namespace detail { DirectoryTreeWriter.BeginArray("d"sv); for (const auto& Item : RootDirectory.Directories) { - CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); + CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Cids, Objects); const IoHash DirectoryHash = Directory.GetHash(); Objects[DirectoryHash] = std::move(Directory); |