aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2022-05-27 10:38:36 -0700
committerGitHub <[email protected]>2022-05-27 10:38:36 -0700
commit7f790cc5e68d498b3d857a13e40a2c62fca87a6c (patch)
tree4d3716561ce7ab542313f037d037374c9601a24c
parentMerge pull request #110 from EpicGames/de/out-of-disk-space-error-handling (diff)
downloadzen-7f790cc5e68d498b3d857a13e40a2c62fca87a6c.tar.xz
zen-7f790cc5e68d498b3d857a13e40a2c62fca87a6c.zip
Horde execute compressed input blobs (#109)
-rw-r--r--zenserver/upstream/hordecompute.cpp163
1 files changed, 123 insertions, 40 deletions
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index 2ec24b303..38c798569 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -157,8 +157,13 @@ namespace detail {
{
ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks();
- std::scoped_lock Lock(m_TaskMutex);
- if (m_PendingTasks.contains(UpstreamData.TaskId))
+
+ bool AlreadyQueued;
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ AlreadyQueued = m_PendingTasks.contains(UpstreamData.TaskId);
+ }
+ if (AlreadyQueued)
{
// Pending task is already queued, return success
ApplyResult.Success = true;
@@ -171,7 +176,7 @@ namespace detail {
CloudCacheSession StorageSession(m_StorageClient);
{
- CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs);
+ CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs, UpstreamData.CasIds);
ApplyResult.Bytes += Result.Bytes;
ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks();
@@ -182,6 +187,22 @@ namespace detail {
return ApplyResult;
}
UpstreamData.Blobs.clear();
+ UpstreamData.CasIds.clear();
+ }
+
+ {
+ CloudCacheResult Result = BatchPutCompressedBlobsIfMissing(StorageSession, UpstreamData.Cids);
+ ApplyResult.Bytes += Result.Bytes;
+ ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-storage-upload-compressed-blobs"] = DateTime::NowTicks();
+ if (!Result.Success)
+ {
+ ApplyResult.Error = {
+ .ErrorCode = Result.ErrorCode,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload compressed blobs"};
+ return ApplyResult;
+ }
+ UpstreamData.Cids.clear();
}
{
@@ -279,9 +300,11 @@ namespace detail {
}
}
- [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs)
+ [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session,
+ const std::map<IoHash, IoBuffer>& Blobs,
+ const std::set<IoHash>& CasIds)
{
- if (Blobs.size() == 0)
+ if (Blobs.size() == 0 && CasIds.size() == 0)
{
return {.Success = true};
}
@@ -292,6 +315,7 @@ namespace detail {
// Batch check for missing blobs
std::set<IoHash> Keys;
std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
+ Keys.insert(CasIds.begin(), CasIds.end());
CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
@@ -310,7 +334,22 @@ namespace detail {
for (const auto& Hash : ExistsResult.Needs)
{
- CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash));
+ IoBuffer DataBuffer;
+ if (Blobs.contains(Hash))
+ {
+ DataBuffer = Blobs.at(Hash);
+ }
+ else
+ {
+ DataBuffer = m_CasStore.FindChunk(Hash);
+ if (!DataBuffer)
+ {
+ Log().warn("Put blob FAILED, input chunk '{}' missing", Hash);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put blobs"};
+ }
+ }
+
+ CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer);
Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
@@ -326,6 +365,62 @@ namespace detail {
return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
}
+ [[nodiscard]] CloudCacheResult BatchPutCompressedBlobsIfMissing(CloudCacheSession& Session, const std::set<IoHash>& Cids)
+ {
+ if (Cids.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing compressed blobs
+ CloudCacheExistsResult ExistsResult = Session.CompressedBlobExists(Session.Client().DefaultBlobStoreNamespace(), Cids);
+ Log().debug("Queried {} missing compressed blobs Need={} Duration={}s Result={}",
+ Cids.size(),
+ ExistsResult.Needs.size(),
+ ExistsResult.ElapsedSeconds,
+ ExistsResult.Success);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (!ExistsResult.Success)
+ {
+ return {
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if compressed blobs exist"};
+ }
+
+ for (const auto& Hash : ExistsResult.Needs)
+ {
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Hash);
+ if (!DataBuffer)
+ {
+ Log().warn("Put compressed blob FAILED, input CID chunk '{}' missing", Hash);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put compressed blobs"};
+ }
+
+ CloudCacheResult Result = Session.PutCompressedBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer);
+ Log().debug("Put compressed blob {} Bytes={} Duration={}s Result={}",
+ Hash,
+ Result.Bytes,
+ Result.ElapsedSeconds,
+ Result.Success);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put compressed blobs"};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
[[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects)
{
if (Objects.size() == 0)
@@ -599,6 +694,8 @@ namespace detail {
{
std::map<IoHash, IoBuffer> Blobs;
std::map<IoHash, CbObject> Objects;
+ std::set<IoHash> CasIds;
+ std::set<IoHash> Cids;
IoHash TaskId;
IoHash RequirementsId;
};
@@ -957,7 +1054,7 @@ namespace detail {
for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
{
CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds))
{
return false;
}
@@ -966,7 +1063,7 @@ namespace detail {
for (auto& It : ApplyRecord.WorkerDescriptor["files"sv])
{
CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds))
{
return false;
}
@@ -1034,11 +1131,10 @@ namespace detail {
bool AnyErrors = false;
ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Cid = Field.AsHash();
- const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
- IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
+ const IoHash Cid = Field.AsHash();
+ const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
- if (!DataBuffer)
+ if (!m_CidStore.ContainsChunk(Cid))
{
Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
AnyErrors = true;
@@ -1050,11 +1146,9 @@ namespace detail {
return;
}
- const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
-
InputFiles.insert(FilePath);
- InputFileHashes[FilePath] = CompressedId;
- Data.Blobs[CompressedId] = std::move(DataBuffer);
+ InputFileHashes[FilePath] = Cid;
+ Data.Cids.insert(Cid);
});
if (AnyErrors)
@@ -1067,7 +1161,7 @@ namespace detail {
const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
- CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects);
+ CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Cids, Data.Objects);
const IoHash SandboxHash = Sandbox.GetHash();
Data.Objects[SandboxHash] = std::move(Sandbox);
@@ -1118,28 +1212,18 @@ namespace detail {
[[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry,
std::set<std::filesystem::path>& InputFiles,
std::map<std::filesystem::path, IoHash>& InputFileHashes,
- std::map<IoHash, IoBuffer>& Blobs)
+ std::set<IoHash>& CasIds)
{
- const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
- const IoHash ChunkId = FileEntry["hash"sv].AsHash();
- const uint64_t Size = FileEntry["size"sv].AsUInt64();
- IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId);
+ const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
+ const IoHash ChunkId = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
- if (!DataBuffer)
+ if (!m_CasStore.ContainsChunk(ChunkId))
{
Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
return false;
}
- if (DataBuffer.Size() != Size)
- {
- Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}",
- ChunkId,
- DataBuffer.Size(),
- Size);
- return false;
- }
-
if (InputFiles.contains(FilePath))
{
Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath);
@@ -1148,7 +1232,7 @@ namespace detail {
InputFiles.insert(FilePath);
InputFileHashes[FilePath] = ChunkId;
- Blobs[ChunkId] = std::move(DataBuffer);
+ CasIds.insert(ChunkId);
return true;
}
@@ -1204,7 +1288,7 @@ namespace detail {
[[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory,
const std::map<std::filesystem::path, IoHash>& InputFileHashes,
- const std::map<IoHash, IoBuffer>& Blobs,
+ const std::set<IoHash>& Cids,
std::map<IoHash, CbObject>& Objects)
{
CbObjectWriter DirectoryTreeWriter;
@@ -1214,14 +1298,13 @@ namespace detail {
DirectoryTreeWriter.BeginArray("f"sv);
for (const auto& File : RootDirectory.Files)
{
- const std::filesystem::path FilePath = {RootDirectory.Path / File};
- const IoHash& FileHash = InputFileHashes.at(FilePath);
- const uint64_t FileSize = Blobs.at(FileHash).Size();
+ const std::filesystem::path FilePath = {RootDirectory.Path / File};
+ const IoHash& FileHash = InputFileHashes.at(FilePath);
+ const bool Compressed = Cids.contains(FileHash);
DirectoryTreeWriter.BeginObject();
DirectoryTreeWriter.AddString("n"sv, File);
DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash);
- DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size
- // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded
+ DirectoryTreeWriter.AddBool("c"sv, Compressed);
DirectoryTreeWriter.EndObject();
}
DirectoryTreeWriter.EndArray();
@@ -1232,7 +1315,7 @@ namespace detail {
DirectoryTreeWriter.BeginArray("d"sv);
for (const auto& Item : RootDirectory.Directories)
{
- CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects);
+ CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Cids, Objects);
const IoHash DirectoryHash = Directory.GetHash();
Objects[DirectoryHash] = std::move(Directory);