diff options
| author | Martin Ridgers <[email protected]> | 2021-10-14 13:38:07 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-10-14 13:38:07 +0200 |
| commit | a95a8db4056de33a46164c304abeb9b5e915ed5c (patch) | |
| tree | 673dc4165eae327eae4cb68fd1398223c86915cc /zenserver | |
| parent | Use std::fs::path for IoBuffer::MakeFromFile(). (diff) | |
| parent | Merge branch 'main' of https://github.com/EpicGames/zen (diff) | |
| download | zen-a95a8db4056de33a46164c304abeb9b5e915ed5c.tar.xz zen-a95a8db4056de33a46164c304abeb9b5e915ed5c.zip | |
Merged main
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 35 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 60 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 79 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 5 |
4 files changed, 110 insertions, 69 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 580446473..ccd06b540 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -710,19 +710,38 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c // Move file into place (atomically) - DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec); + std::filesystem::path FsPath{DataFilePath.c_str()}; + + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); if (Ec) { - std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); - CreateDirectories(ParentPath); - - DataFile.MoveTemporaryIntoPlace(DataFilePath.c_str(), Ec); + int RetryCount = 3; - if (Ec) + do { - throw std::system_error(Ec, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); - } + std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); + CreateDirectories(ParentPath); + + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + + if (Ec) + { + std::error_code InnerEc; + const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc); + + if (!InnerEc && ExistingFileSize == Value.Value.Size()) + { + // Concurrent write of same value? + return; + } + } + + // Semi arbitrary back-off + zen::Sleep(1000 * RetryCount); + } while (RetryCount--); + + throw std::system_error(Ec, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); } // Update index diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 672ff14d8..0e1ba01cb 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -1232,66 +1232,16 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) const auto& OplogId = Req.GetCapture(2); const auto& HashString = Req.GetCapture(3); - bool IsOffset = false; - uint64_t Offset = 0; - uint64_t Size = ~(0ull); - - auto QueryParms = Req.ServerRequest().GetQueryParams(); - - if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false) - { - if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) - { - Offset = OffsetVal.value(); - IsOffset = true; - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - - if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false) - { - if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) - { - Size = SizeVal.value(); - IsOffset = true; - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - ZEN_DEBUG("oplog hash - {} / {} / {}", ProjectId, OplogId, HashString); - IoHash Hash = IoHash::FromHexString(HashString); - IoBuffer Value = m_CasStore.FindChunk(Hash); - if (!Value) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } + IoHash Hash = IoHash::FromHexString(HashString); - if (IsOffset) + if (IoBuffer Value = m_CasStore.FindChunk(Hash)) { - if (Offset > Value.Size()) - { - Offset = Value.Size(); - } - - if ((Offset + Size) > Value.Size()) - { - Size = Value.Size() - Offset; - } - - // Send only a subset of data - IoBuffer InnerValue(Value, Offset, Size); - - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, InnerValue); + return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); + return HttpReq.WriteResponse(HttpResponseCode::NotFound); }, HttpVerb::kGet); @@ -1325,7 +1275,7 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) if (!m_CasStore.FindChunk(FileHash)) { - ZEN_DEBUG("NEED: {}", FileHash); + ZEN_DEBUG("prep - NEED: {}", FileHash); NeedList.push_back(FileHash); } diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 0397ddaa0..1c43f1bc6 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -552,7 +552,7 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -583,7 +583,7 @@ CloudCacheSession::BlobExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -614,7 +614,7 @@ CloudCacheSession::CompressedBlobExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -645,7 +645,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key) Session.SetOption(cpr::Url{Uri.c_str()}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); if (Response.error) @@ -660,6 +660,77 @@ CloudCacheSession::ObjectExists(const IoHash& Key) return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } +CloudCacheResult +CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + + auto& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult +CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + + auto& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + std::vector<IoHash> CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 1de417008..9471ef64f 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -94,13 +94,14 @@ public: FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); - CloudCacheResult DerivedDataExists(std::string_view BucketId, std::string_view Key); - CloudCacheResult DerivedDataExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); CloudCacheResult BlobExists(const IoHash& Key); CloudCacheResult CompressedBlobExists(const IoHash& Key); CloudCacheResult ObjectExists(const IoHash& Key); + CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData); + CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); + std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); private: |