diff options
| author | Dan Engelbrecht <[email protected]> | 2022-09-08 20:29:31 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-09-08 20:29:31 +0200 |
| commit | 2dfb0d1b896b756cf37bc08bebf5205ce6f0bf9b (patch) | |
| tree | 137f0df49510fc91963c6c00079d5b3a756eac94 | |
| parent | fixup partial record detection (diff) | |
| parent | 0.1.4 (diff) | |
| download | zen-2dfb0d1b896b756cf37bc08bebf5205ce6f0bf9b.tar.xz zen-2dfb0d1b896b756cf37bc08bebf5205ce6f0bf9b.zip | |
Merge remote-tracking branch 'origin/main' into de/new-upstream-api-with-separate-policy
| -rw-r--r-- | CHANGELOG.md | 4 | ||||
| -rw-r--r-- | VERSION.txt | 2 | ||||
| -rw-r--r-- | zenhttp/httpasio.cpp | 12 | ||||
| -rw-r--r-- | zenhttp/httpsys.cpp | 12 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 22 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 8 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 34 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 68 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 2 | ||||
| -rw-r--r-- | zenstore/blockstore.cpp | 211 | ||||
| -rw-r--r-- | zenstore/cas.cpp | 2 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 2 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 2 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 10 |
16 files changed, 114 insertions, 280 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index b3378372e..ee7d6d5f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## -- Change: Bumped ZEN_SCHEMA_VERSION - this will invalidate intire local cache when deployed +- Change: Bumped ZEN_SCHEMA_VERSION - this will invalidate entire local cache when deployed - Change: Make CAS storage an hidden implementation detail of CidStore, we no longer hash and do mapping to compressed hash when storing cache values - Feature: Extended zen print command to also handle CbPackage and CompressedBuffer format payloads - Feature: Added /prj/{project}/oplog/{log}/{op} endpoint to allow retrieval of an op entry by LSN. Supports returning CbObject or CbPackage format payloads @@ -8,11 +8,13 @@ - Improvement: Frontend: simplified content-type logic - Improvement: Improved message indicating no GC is scheduled - Improvement: Implement proper GetCacheValues upstream path +- Improvement: Demote a number of ZEN_ERROR log calls for problems that are recoverable and handled - Bugfix: Fixed issue in CbPackage marshaling of local reference - Bugfix: Fix crash when switching Zen upstream configured via DNS when one endpoint becomes unresposive - Bugfix: Fixed issue where projects would not be discovered via DiscoverProjects due to use of stem() vs filename() - Bugfix: Use "\\\\?\\" prefixed paths on Windows and fix hardcoded path delimiters (UE-141222) - Bugfix: Safer detection of html folder when running non-bundled executable +- Bugfix: Use "application/x-jupiter-inline" to fetch GetCacheValues from Horde (UE-162151) - Sentry: Added logging of sentry_init error code - Sentry: Attach log file to Sentry error reports - Sentry: Capture capture error/critical log statements as errors in Sentry diff --git a/VERSION.txt b/VERSION.txt index c812ecb80..446ba66e7 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -0.1.4-pre26
\ No newline at end of file +0.1.4
\ No newline at end of file diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp index e4b85710d..39a363711 100644 --- a/zenhttp/httpasio.cpp +++ b/zenhttp/httpasio.cpp @@ -403,7 +403,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] } else { - ZEN_ERROR("on data received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); return OnError(); } } @@ -441,7 +441,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu { if (Ec) { - ZEN_ERROR("on data sent ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); + ZEN_WARN("on data sent ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); OnError(); } else @@ -987,10 +987,10 @@ struct HttpAcceptor m_Acceptor.async_accept(SocketRef, [this, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable { if (Ec) { - ZEN_ERROR("asio async_accept, connection failed to '{}:{}' reason '{}'", - m_Acceptor.local_endpoint().address().to_string(), - m_Acceptor.local_endpoint().port(), - Ec.message()); + ZEN_WARN("asio async_accept, connection failed to '{}:{}' reason '{}'", + m_Acceptor.local_endpoint().address().to_string(), + m_Acceptor.local_endpoint().port(), + Ec.message()); } else { diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index 19dba126a..926e6b09f 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -598,10 +598,10 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) CancelThreadpoolIo(Iocp); - ZEN_ERROR("failed to send HTTP response (error: '{}'), request URL: '{}', request id: {}", - GetSystemErrorAsString(SendResult), - HttpReq->pRawUrl, - HttpReq->RequestId); + ZEN_WARN("failed to send HTTP response (error: '{}'), request URL: '{}', request id: {}", + GetSystemErrorAsString(SendResult), + HttpReq->pRawUrl, + HttpReq->RequestId); ErrorCode = MakeErrorCode(SendResult); } @@ -1134,7 +1134,7 @@ HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler return true; } - ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message()); + ZEN_WARN("IssueRequest() failed: '{}'", ErrorCode.message()); } catch (std::exception& Ex) { @@ -1463,7 +1463,7 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode) ErrorCode = MakeErrorCode(HttpApiResult); - ZEN_ERROR("HttpReceiveHttpRequest failed, error: '{}'", ErrorCode.message()); + ZEN_WARN("HttpReceiveHttpRequest failed, error: '{}'", ErrorCode.message()); return; } diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 76a8707c5..e25759192 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1132,7 +1132,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) if (!BadKeys.empty()) { - ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); + ZEN_WARN("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName); if (Ctx.RunRecovery()) { diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index bcc9d434b..e49c05f4c 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -291,18 +291,16 @@ private: std::atomic_uint64_t m_TotalSize{}; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); - void MakeIndexSnapshot(); - uint64_t ReadIndexFile(); - uint64_t ReadLog(uint64_t LogPosition); - uint64_t MigrateLegacyData(bool CleanSource); - void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); - static bool Delete(std::filesystem::path BucketDir); - void SaveManifest(); + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t LogPosition); + void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); + void SaveManifest(); // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index e42704ccf..93276f029 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -784,7 +784,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId) } catch (std::exception& ex) { - ZEN_ERROR("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); + ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what()); m_Oplogs.erase(std::string{OplogId}); } @@ -1582,7 +1582,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) std::filesystem::path BadPackagePath = Oplog.TempPath() / "bad_packages" / fmt::format("session{}_request{}", HttpReq.SessionId(), HttpReq.RequestId()); - ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath); + ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath); zen::WriteFile(BadPackagePath, Payload); @@ -1667,7 +1667,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { // Error - malformed object - ZEN_ERROR("malformed object returned for {}", AttachmentHash); + ZEN_WARN("malformed object returned for {}", AttachmentHash); } break; @@ -1680,7 +1680,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { // Error - not compressed! - ZEN_ERROR("invalid compressed binary returned for {}", AttachmentHash); + ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash); } break; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 65fa1da92..b82290f3d 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -181,6 +181,40 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K } CloudCacheResult +CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash) +{ + ZEN_TRACE_CPU("HordeClient::GetInlineBlob"); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = GetSession(); + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); + Session.SetOption(cpr::Body{}); + + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + OutPayloadHash = IoHash::FromHexString(Response.header["X-Jupiter-InlinePayloadHash"]); + + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + +CloudCacheResult CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("HordeClient::GetObject"); diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 4e5e38d6f..88ab77247 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -99,6 +99,7 @@ public: CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key); CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash); PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 3b532959b..fbf8593a6 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -527,15 +527,35 @@ namespace detail { if (!Result.Error) { std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, IoHash::Zero); - Payload = BlobResult.Response; + IoHash PayloadHash; + const CloudCacheResult BlobResult = + Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash); + Payload = BlobResult.Response; AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); - if (Payload && IsCompressedBinary(Payload.GetContentType())) + if (Payload) { - Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + if (IsCompressedBinary(Payload.GetContentType())) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + } + else + { + Compressed = CompressedBuffer::Compress(SharedBuffer(Payload)); + IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + if (RawHash != PayloadHash) + { + ZEN_WARN("Horde request for inline payload of {}/{}/{} has hash {}, expected hash {} from header", + Namespace, + Request.Key.Bucket, + Request.Key.Hash.ToHexString(), + RawHash.ToHexString(), + PayloadHash.ToHexString()); + Compressed.Reset(); + } + } } } @@ -1712,10 +1732,10 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } } } @@ -1793,10 +1813,10 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + ZEN_WARN("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } RemainingKeys = std::move(Missing); @@ -1868,10 +1888,10 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } RemainingKeys = std::move(Missing); @@ -1921,10 +1941,10 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + ZEN_WARN("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } } } @@ -1983,10 +2003,10 @@ public: { Stats.CacheErrorCount.Increment(1); - ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); + ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } RemainingKeys = std::move(Missing); diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 2effa4e68..9e15b0c64 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -668,7 +668,7 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All); ValidationResult != CbValidateError::None) { - ZEN_ERROR("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult)); + ZEN_WARN("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult)); WipeState = true; WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult)); diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index 88592d785..38371124d 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -718,217 +718,6 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations, } } -bool -BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations, - const std::filesystem::path& SourceBlockFilePath, - const std::filesystem::path& BlocksBasePath, - uint64_t MaxBlockSize, - uint64_t MaxBlockCount, - size_t PayloadAlignment, - bool CleanSource, - const SplitCallback& Callback) -{ - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error); - if (Error) - { - ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message()); - return false; - } - - if (Space.Free < MaxBlockSize) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}", - BlocksBasePath, - MaxBlockSize, - NiceBytes(Space.Free)); - return false; - } - - size_t TotalSize = 0; - for (const BlockStoreLocation& Location : ChunkLocations) - { - TotalSize += Location.Size; - } - size_t ChunkCount = ChunkLocations.size(); - uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount); - uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize; - if (MaxRequiredBlockCount > MaxBlockCount) - { - ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}", - BlocksBasePath, - MaxRequiredBlockCount, - MaxBlockCount); - return false; - } - - constexpr const uint64_t DiskReserve = 1ul << 28; - - if (CleanSource) - { - if (Space.Free < (MaxBlockSize + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - BlocksBasePath, - NiceBytes(MaxBlockSize + DiskReserve), - NiceBytes(Space.Free)); - return false; - } - } - else - { - if (Space.Free < (RequiredDiskSpace + DiskReserve)) - { - ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})", - BlocksBasePath, - NiceBytes(RequiredDiskSpace + DiskReserve), - NiceBytes(Space.Free)); - return false; - } - } - - uint32_t WriteBlockIndex = 0; - while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - - BasicFile BlockFile; - BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead); - - if (CleanSource && (MaxRequiredBlockCount < 2)) - { - MovedChunksArray Chunks; - Chunks.reserve(ChunkCount); - for (size_t Index = 0; Index < ChunkCount; ++Index) - { - const BlockStoreLocation& ChunkLocation = ChunkLocations[Index]; - Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}}); - } - std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex); - CreateDirectories(BlockPath.parent_path()); - BlockFile.Close(); - std::filesystem::rename(SourceBlockFilePath, BlockPath); - Callback(Chunks); - return true; - } - - ChunkIndexArray ChunkIndexes; - ChunkIndexes.reserve(ChunkCount); - for (size_t Index = 0; Index < ChunkCount; ++Index) - { - ChunkIndexes.push_back(Index); - } - - std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&](size_t Lhs, size_t Rhs) { - const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs]; - const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs]; - return LhsLocation.Offset < RhsLocation.Offset; - }); - - uint64_t BlockSize = 0; - uint64_t BlockOffset = 0; - std::vector<BlockStoreLocation> NewLocations; - struct BlockData - { - MovedChunksArray Chunks; - uint64_t BlockOffset; - uint64_t BlockSize; - uint32_t BlockIndex; - }; - - std::vector<BlockData> BlockRanges; - MovedChunksArray Chunks; - BlockRanges.reserve(MaxRequiredBlockCount); - for (const size_t& ChunkIndex : ChunkIndexes) - { - const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex]; - - uint64_t ChunkOffset = LegacyChunkLocation.Offset; - uint64_t ChunkSize = LegacyChunkLocation.Size; - uint64_t ChunkEnd = ChunkOffset + ChunkSize; - - if (BlockSize == 0) - { - BlockOffset = ChunkOffset; - } - if ((ChunkEnd - BlockOffset) > MaxBlockSize) - { - BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}; - BlockRange.Chunks.swap(Chunks); - BlockRanges.push_back(BlockRange); - - WriteBlockIndex++; - while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex))) - { - ++WriteBlockIndex; - } - BlockOffset = ChunkOffset; - BlockSize = 0; - } - BlockSize = RoundUp(BlockSize, PayloadAlignment); - BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize}; - Chunks.push_back({ChunkIndex, ChunkLocation}); - BlockSize = ChunkEnd - BlockOffset; - } - if (BlockSize > 0) - { - BlockRanges.push_back( - {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex}); - } - - Stopwatch WriteBlockTimer; - - std::reverse(BlockRanges.begin(), BlockRanges.end()); - std::vector<std::uint8_t> Buffer(1 << 28); - for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx) - { - const BlockData& BlockRange = BlockRanges[Idx]; - if (Idx > 0) - { - uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize; - uint64_t Completed = BlockOffset + BlockSize - Remaining; - uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed; - - ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}", - BlocksBasePath, - Idx, - BlockRanges.size(), - NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize), - NiceBytes(BlockOffset + BlockSize), - NiceTimeSpanMs(ETA)); - } - - std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex); - BlockStoreFile ChunkBlock(BlockPath); - ChunkBlock.Create(BlockRange.BlockSize); - uint64_t Offset = 0; - while (Offset < BlockRange.BlockSize) - { - uint64_t Size = BlockRange.BlockSize - Offset; - if (Size > Buffer.size()) - { - Size = Buffer.size(); - } - BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset); - ChunkBlock.Write(Buffer.data(), Size, Offset); - Offset += Size; - } - ChunkBlock.Truncate(Offset); - ChunkBlock.Flush(); - - Callback(BlockRange.Chunks); - - if (CleanSource) - { - BlockFile.SetFileSize(BlockRange.BlockOffset); - } - } - BlockFile.Close(); - - return true; -} - const char* BlockStore::GetBlockFileExtension() { diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp index 54e8cb11c..f8fc41341 100644 --- a/zenstore/cas.cpp +++ b/zenstore/cas.cpp @@ -148,7 +148,7 @@ CasImpl::OpenOrCreateManifest() } else { - ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult)); + ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult)); } if (ManifestIsOk) diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index a7fdfa1f5..519478356 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -327,7 +327,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) if (!BadKeys.empty()) { - ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); + ZEN_WARN("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName); if (Ctx.RunRecovery()) { diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 23e3f4cd8..79867dcfa 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -669,7 +669,7 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) if (!BadHashes.empty()) { - ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size()); + ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size()); if (Ctx.RunRecovery()) { diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h index fe435cdff..db32efb9f 100644 --- a/zenstore/include/zenstore/blockstore.h +++ b/zenstore/include/zenstore/blockstore.h @@ -123,7 +123,6 @@ public: typedef std::function<uint64_t()> ClaimDiskReserveCallback; typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback; typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback; - typedef std::function<void(const MovedChunksArray& MovedChunks)> SplitCallback; typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback; void Initialize(const std::filesystem::path& BlocksBasePath, @@ -151,15 +150,6 @@ public: const IterateChunksSmallSizeCallback& SmallSizeCallback, const IterateChunksLargeSizeCallback& LargeSizeCallback); - static bool Split(const std::vector<BlockStoreLocation>& ChunkLocations, - const std::filesystem::path& SourceBlockFilePath, - const std::filesystem::path& BlocksBasePath, - uint64_t MaxBlockSize, - uint64_t MaxBlockCount, - size_t PayloadAlignment, - bool CleanSource, - const SplitCallback& Callback); - static const char* GetBlockFileExtension(); static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex); |