aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-10-14 13:38:07 +0200
committerMartin Ridgers <[email protected]>2021-10-14 13:38:07 +0200
commita95a8db4056de33a46164c304abeb9b5e915ed5c (patch)
tree673dc4165eae327eae4cb68fd1398223c86915cc /zenserver
parentUse std::fs::path for IoBuffer::MakeFromFile(). (diff)
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-a95a8db4056de33a46164c304abeb9b5e915ed5c.tar.xz
zen-a95a8db4056de33a46164c304abeb9b5e915ed5c.zip
Merged main
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcachestore.cpp35
-rw-r--r--zenserver/projectstore.cpp60
-rw-r--r--zenserver/upstream/jupiter.cpp79
-rw-r--r--zenserver/upstream/jupiter.h5
4 files changed, 110 insertions, 69 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 580446473..ccd06b540 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -710,19 +710,38 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
// Move file into place (atomically)
- DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec);
+ std::filesystem::path FsPath{DataFilePath.c_str()};
+
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
if (Ec)
{
- std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path();
- CreateDirectories(ParentPath);
-
- DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec);
+ int RetryCount = 3;
- if (Ec)
+ do
{
- throw std::system_error(Ec, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath)));
- }
+ std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path();
+ CreateDirectories(ParentPath);
+
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+
+ if (Ec)
+ {
+ std::error_code InnerEc;
+ const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc);
+
+ if (!InnerEc && ExistingFileSize == Value.Value.Size())
+ {
+ // Concurrent write of same value?
+ return;
+ }
+ }
+
+ // Semi arbitrary back-off
+ zen::Sleep(1000 * RetryCount);
+ } while (RetryCount--);
+
+ throw std::system_error(Ec, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath)));
}
// Update index
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 672ff14d8..0e1ba01cb 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -1232,66 +1232,16 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
const auto& OplogId = Req.GetCapture(2);
const auto& HashString = Req.GetCapture(3);
- bool IsOffset = false;
- uint64_t Offset = 0;
- uint64_t Size = ~(0ull);
-
- auto QueryParms = Req.ServerRequest().GetQueryParams();
-
- if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false)
- {
- if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm))
- {
- Offset = OffsetVal.value();
- IsOffset = true;
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
- }
-
- if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false)
- {
- if (auto SizeVal = ParseInt<uint64_t>(SizeParm))
- {
- Size = SizeVal.value();
- IsOffset = true;
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
- }
-
ZEN_DEBUG("oplog hash - {} / {} / {}", ProjectId, OplogId, HashString);
- IoHash Hash = IoHash::FromHexString(HashString);
- IoBuffer Value = m_CasStore.FindChunk(Hash);
- if (!Value)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
+ IoHash Hash = IoHash::FromHexString(HashString);
- if (IsOffset)
+ if (IoBuffer Value = m_CasStore.FindChunk(Hash))
{
- if (Offset > Value.Size())
- {
- Offset = Value.Size();
- }
-
- if ((Offset + Size) > Value.Size())
- {
- Size = Value.Size() - Offset;
- }
-
- // Send only a subset of data
- IoBuffer InnerValue(Value, Offset, Size);
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, InnerValue);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
}
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
},
HttpVerb::kGet);
@@ -1325,7 +1275,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
if (!m_CasStore.FindChunk(FileHash))
{
- ZEN_DEBUG("NEED: {}", FileHash);
+ ZEN_DEBUG("prep - NEED: {}", FileHash);
NeedList.push_back(FileHash);
}
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 0397ddaa0..1c43f1bc6 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -552,7 +552,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -583,7 +583,7 @@ CloudCacheSession::BlobExists(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -614,7 +614,7 @@ CloudCacheSession::CompressedBlobExists(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -645,7 +645,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
Session.SetOption(cpr::Url{Uri.c_str()});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
if (Response.error)
@@ -660,6 +660,77 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
+CloudCacheResult
+CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
+
+ auto& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
+CloudCacheResult
+CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
+
+ auto& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
std::vector<IoHash>
CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 1de417008..9471ef64f 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -94,13 +94,14 @@ public:
FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
- CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key);
- CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key);
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
CloudCacheResult BlobExists(const IoHash& Key);
CloudCacheResult CompressedBlobExists(const IoHash& Key);
CloudCacheResult ObjectExists(const IoHash& Key);
+ CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData);
+ CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
+
std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
private: