aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-09-08 20:29:31 +0200
committerDan Engelbrecht <[email protected]>2022-09-08 20:29:31 +0200
commit2dfb0d1b896b756cf37bc08bebf5205ce6f0bf9b (patch)
tree137f0df49510fc91963c6c00079d5b3a756eac94
parentfixup partial record detection (diff)
parent0.1.4 (diff)
downloadzen-2dfb0d1b896b756cf37bc08bebf5205ce6f0bf9b.tar.xz
zen-2dfb0d1b896b756cf37bc08bebf5205ce6f0bf9b.zip
Merge remote-tracking branch 'origin/main' into de/new-upstream-api-with-separate-policy
-rw-r--r--CHANGELOG.md4
-rw-r--r--VERSION.txt2
-rw-r--r--zenhttp/httpasio.cpp12
-rw-r--r--zenhttp/httpsys.cpp12
-rw-r--r--zenserver/cache/structuredcachestore.cpp2
-rw-r--r--zenserver/cache/structuredcachestore.h22
-rw-r--r--zenserver/projectstore.cpp8
-rw-r--r--zenserver/upstream/jupiter.cpp34
-rw-r--r--zenserver/upstream/jupiter.h1
-rw-r--r--zenserver/upstream/upstreamcache.cpp68
-rw-r--r--zenserver/zenserver.cpp2
-rw-r--r--zenstore/blockstore.cpp211
-rw-r--r--zenstore/cas.cpp2
-rw-r--r--zenstore/compactcas.cpp2
-rw-r--r--zenstore/filecas.cpp2
-rw-r--r--zenstore/include/zenstore/blockstore.h10
16 files changed, 114 insertions, 280 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b3378372e..ee7d6d5f5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,5 @@
##
-- Change: Bumped ZEN_SCHEMA_VERSION - this will invalidate intire local cache when deployed
+- Change: Bumped ZEN_SCHEMA_VERSION - this will invalidate entire local cache when deployed
- Change: Make CAS storage an hidden implementation detail of CidStore, we no longer hash and do mapping to compressed hash when storing cache values
- Feature: Extended zen print command to also handle CbPackage and CompressedBuffer format payloads
- Feature: Added /prj/{project}/oplog/{log}/{op} endpoint to allow retrieval of an op entry by LSN. Supports returning CbObject or CbPackage format payloads
@@ -8,11 +8,13 @@
- Improvement: Frontend: simplified content-type logic
- Improvement: Improved message indicating no GC is scheduled
- Improvement: Implement proper GetCacheValues upstream path
+- Improvement: Demote a number of ZEN_ERROR log calls for problems that are recoverable and handled
- Bugfix: Fixed issue in CbPackage marshaling of local reference
- Bugfix: Fix crash when switching Zen upstream configured via DNS when one endpoint becomes unresposive
- Bugfix: Fixed issue where projects would not be discovered via DiscoverProjects due to use of stem() vs filename()
- Bugfix: Use "\\\\?\\" prefixed paths on Windows and fix hardcoded path delimiters (UE-141222)
- Bugfix: Safer detection of html folder when running non-bundled executable
+- Bugfix: Use "application/x-jupiter-inline" to fetch GetCacheValues from Horde (UE-162151)
- Sentry: Added logging of sentry_init error code
- Sentry: Attach log file to Sentry error reports
- Sentry: Capture capture error/critical log statements as errors in Sentry
diff --git a/VERSION.txt b/VERSION.txt
index c812ecb80..446ba66e7 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-0.1.4-pre26 \ No newline at end of file
+0.1.4 \ No newline at end of file
diff --git a/zenhttp/httpasio.cpp b/zenhttp/httpasio.cpp
index e4b85710d..39a363711 100644
--- a/zenhttp/httpasio.cpp
+++ b/zenhttp/httpasio.cpp
@@ -403,7 +403,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]
}
else
{
- ZEN_ERROR("on data received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message());
+ ZEN_WARN("on data received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message());
return OnError();
}
}
@@ -441,7 +441,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu
{
if (Ec)
{
- ZEN_ERROR("on data sent ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message());
+ ZEN_WARN("on data sent ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message());
OnError();
}
else
@@ -987,10 +987,10 @@ struct HttpAcceptor
m_Acceptor.async_accept(SocketRef, [this, Socket = std::move(SocketPtr)](const asio::error_code& Ec) mutable {
if (Ec)
{
- ZEN_ERROR("asio async_accept, connection failed to '{}:{}' reason '{}'",
- m_Acceptor.local_endpoint().address().to_string(),
- m_Acceptor.local_endpoint().port(),
- Ec.message());
+ ZEN_WARN("asio async_accept, connection failed to '{}:{}' reason '{}'",
+ m_Acceptor.local_endpoint().address().to_string(),
+ m_Acceptor.local_endpoint().port(),
+ Ec.message());
}
else
{
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index 19dba126a..926e6b09f 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -598,10 +598,10 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
CancelThreadpoolIo(Iocp);
- ZEN_ERROR("failed to send HTTP response (error: '{}'), request URL: '{}', request id: {}",
- GetSystemErrorAsString(SendResult),
- HttpReq->pRawUrl,
- HttpReq->RequestId);
+ ZEN_WARN("failed to send HTTP response (error: '{}'), request URL: '{}', request id: {}",
+ GetSystemErrorAsString(SendResult),
+ HttpReq->pRawUrl,
+ HttpReq->RequestId);
ErrorCode = MakeErrorCode(SendResult);
}
@@ -1134,7 +1134,7 @@ HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler
return true;
}
- ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message());
+ ZEN_WARN("IssueRequest() failed: '{}'", ErrorCode.message());
}
catch (std::exception& Ex)
{
@@ -1463,7 +1463,7 @@ InitialRequestHandler::IssueRequest(std::error_code& ErrorCode)
ErrorCode = MakeErrorCode(HttpApiResult);
- ZEN_ERROR("HttpReceiveHttpRequest failed, error: '{}'", ErrorCode.message());
+ ZEN_WARN("HttpReceiveHttpRequest failed, error: '{}'", ErrorCode.message());
return;
}
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 76a8707c5..e25759192 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -1132,7 +1132,7 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadKeys.empty())
{
- ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
+ ZEN_WARN("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
if (Ctx.RunRecovery())
{
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index bcc9d434b..e49c05f4c 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -291,18 +291,16 @@ private:
std::atomic_uint64_t m_TotalSize{};
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey);
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
- void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
- void MakeIndexSnapshot();
- uint64_t ReadIndexFile();
- uint64_t ReadLog(uint64_t LogPosition);
- uint64_t MigrateLegacyData(bool CleanSource);
- void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew);
- static bool Delete(std::filesystem::path BucketDir);
- void SaveManifest();
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile();
+ uint64_t ReadLog(uint64_t LogPosition);
+ void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew);
+ void SaveManifest();
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index e42704ccf..93276f029 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -784,7 +784,7 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId)
}
catch (std::exception& ex)
{
- ZEN_ERROR("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what());
+ ZEN_WARN("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what());
m_Oplogs.erase(std::string{OplogId});
}
@@ -1582,7 +1582,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
std::filesystem::path BadPackagePath =
Oplog.TempPath() / "bad_packages" / fmt::format("session{}_request{}", HttpReq.SessionId(), HttpReq.RequestId());
- ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath);
+ ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath);
zen::WriteFile(BadPackagePath, Payload);
@@ -1667,7 +1667,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
// Error - malformed object
- ZEN_ERROR("malformed object returned for {}", AttachmentHash);
+ ZEN_WARN("malformed object returned for {}", AttachmentHash);
}
break;
@@ -1680,7 +1680,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
// Error - not compressed!
- ZEN_ERROR("invalid compressed binary returned for {}", AttachmentHash);
+ ZEN_WARN("invalid compressed binary returned for {}", AttachmentHash);
}
break;
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 65fa1da92..b82290f3d 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -181,6 +181,40 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
}
CloudCacheResult
+CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash)
+{
+ ZEN_TRACE_CPU("HordeClient::GetInlineBlob");
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
+
+ cpr::Session& Session = GetSession();
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}});
+ Session.SetOption(cpr::Body{});
+
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ OutPayloadHash = IoHash::FromHexString(Response.header["X-Jupiter-InlinePayloadHash"]);
+
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
+CloudCacheResult
CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
{
ZEN_TRACE_CPU("HordeClient::GetObject");
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 4e5e38d6f..88ab77247 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -99,6 +99,7 @@ public:
CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key);
CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash);
PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 3b532959b..fbf8593a6 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -527,15 +527,35 @@ namespace detail {
if (!Result.Error)
{
std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace);
- const CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, IoHash::Zero);
- Payload = BlobResult.Response;
+ IoHash PayloadHash;
+ const CloudCacheResult BlobResult =
+ Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash);
+ Payload = BlobResult.Response;
AppendResult(BlobResult, Result);
m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason);
- if (Payload && IsCompressedBinary(Payload.GetContentType()))
+ if (Payload)
{
- Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (IsCompressedBinary(Payload.GetContentType()))
+ {
+ Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ }
+ else
+ {
+ Compressed = CompressedBuffer::Compress(SharedBuffer(Payload));
+ IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ if (RawHash != PayloadHash)
+ {
+ ZEN_WARN("Horde request for inline payload of {}/{}/{} has hash {}, expected hash {} from header",
+ Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash.ToHexString(),
+ RawHash.ToHexString(),
+ PayloadHash.ToHexString());
+ Compressed.Reset();
+ }
+ }
}
}
@@ -1712,10 +1732,10 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ ZEN_WARN("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
}
}
@@ -1793,10 +1813,10 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ ZEN_WARN("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
RemainingKeys = std::move(Missing);
@@ -1868,10 +1888,10 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
RemainingKeys = std::move(Missing);
@@ -1921,10 +1941,10 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ ZEN_WARN("get cache value FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
}
}
@@ -1983,10 +2003,10 @@ public:
{
Stats.CacheErrorCount.Increment(1);
- ZEN_ERROR("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
- Endpoint->GetEndpointInfo().Url,
- Result.Error.Reason,
- Result.Error.ErrorCode);
+ ZEN_WARN("get cache values(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
}
RemainingKeys = std::move(Missing);
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 2effa4e68..9e15b0c64 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -668,7 +668,7 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All);
ValidationResult != CbValidateError::None)
{
- ZEN_ERROR("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult));
+ ZEN_WARN("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult));
WipeState = true;
WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult));
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index 88592d785..38371124d 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -718,217 +718,6 @@ BlockStore::IterateChunks(const std::vector<BlockStoreLocation>& ChunkLocations,
}
}
-bool
-BlockStore::Split(const std::vector<BlockStoreLocation>& ChunkLocations,
- const std::filesystem::path& SourceBlockFilePath,
- const std::filesystem::path& BlocksBasePath,
- uint64_t MaxBlockSize,
- uint64_t MaxBlockCount,
- size_t PayloadAlignment,
- bool CleanSource,
- const SplitCallback& Callback)
-{
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(BlocksBasePath.parent_path(), Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in {} FAILED, reason: '{}'", BlocksBasePath, Error.message());
- return false;
- }
-
- if (Space.Free < MaxBlockSize)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required disk space {}, free {}",
- BlocksBasePath,
- MaxBlockSize,
- NiceBytes(Space.Free));
- return false;
- }
-
- size_t TotalSize = 0;
- for (const BlockStoreLocation& Location : ChunkLocations)
- {
- TotalSize += Location.Size;
- }
- size_t ChunkCount = ChunkLocations.size();
- uint64_t RequiredDiskSpace = TotalSize + ((PayloadAlignment - 1) * ChunkCount);
- uint64_t MaxRequiredBlockCount = RoundUp(RequiredDiskSpace, MaxBlockSize) / MaxBlockSize;
- if (MaxRequiredBlockCount > MaxBlockCount)
- {
- ZEN_ERROR("legacy store migration from '{}' FAILED, required block count {}, possible {}",
- BlocksBasePath,
- MaxRequiredBlockCount,
- MaxBlockCount);
- return false;
- }
-
- constexpr const uint64_t DiskReserve = 1ul << 28;
-
- if (CleanSource)
- {
- if (Space.Free < (MaxBlockSize + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- BlocksBasePath,
- NiceBytes(MaxBlockSize + DiskReserve),
- NiceBytes(Space.Free));
- return false;
- }
- }
- else
- {
- if (Space.Free < (RequiredDiskSpace + DiskReserve))
- {
- ZEN_INFO("legacy store migration from '{}' aborted, not enough disk space available {} ({})",
- BlocksBasePath,
- NiceBytes(RequiredDiskSpace + DiskReserve),
- NiceBytes(Space.Free));
- return false;
- }
- }
-
- uint32_t WriteBlockIndex = 0;
- while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
-
- BasicFile BlockFile;
- BlockFile.Open(SourceBlockFilePath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
-
- if (CleanSource && (MaxRequiredBlockCount < 2))
- {
- MovedChunksArray Chunks;
- Chunks.reserve(ChunkCount);
- for (size_t Index = 0; Index < ChunkCount; ++Index)
- {
- const BlockStoreLocation& ChunkLocation = ChunkLocations[Index];
- Chunks.push_back({Index, {.BlockIndex = WriteBlockIndex, .Offset = ChunkLocation.Offset, .Size = ChunkLocation.Size}});
- }
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex);
- CreateDirectories(BlockPath.parent_path());
- BlockFile.Close();
- std::filesystem::rename(SourceBlockFilePath, BlockPath);
- Callback(Chunks);
- return true;
- }
-
- ChunkIndexArray ChunkIndexes;
- ChunkIndexes.reserve(ChunkCount);
- for (size_t Index = 0; Index < ChunkCount; ++Index)
- {
- ChunkIndexes.push_back(Index);
- }
-
- std::sort(begin(ChunkIndexes), end(ChunkIndexes), [&](size_t Lhs, size_t Rhs) {
- const BlockStoreLocation& LhsLocation = ChunkLocations[Lhs];
- const BlockStoreLocation& RhsLocation = ChunkLocations[Rhs];
- return LhsLocation.Offset < RhsLocation.Offset;
- });
-
- uint64_t BlockSize = 0;
- uint64_t BlockOffset = 0;
- std::vector<BlockStoreLocation> NewLocations;
- struct BlockData
- {
- MovedChunksArray Chunks;
- uint64_t BlockOffset;
- uint64_t BlockSize;
- uint32_t BlockIndex;
- };
-
- std::vector<BlockData> BlockRanges;
- MovedChunksArray Chunks;
- BlockRanges.reserve(MaxRequiredBlockCount);
- for (const size_t& ChunkIndex : ChunkIndexes)
- {
- const BlockStoreLocation& LegacyChunkLocation = ChunkLocations[ChunkIndex];
-
- uint64_t ChunkOffset = LegacyChunkLocation.Offset;
- uint64_t ChunkSize = LegacyChunkLocation.Size;
- uint64_t ChunkEnd = ChunkOffset + ChunkSize;
-
- if (BlockSize == 0)
- {
- BlockOffset = ChunkOffset;
- }
- if ((ChunkEnd - BlockOffset) > MaxBlockSize)
- {
- BlockData BlockRange{.BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex};
- BlockRange.Chunks.swap(Chunks);
- BlockRanges.push_back(BlockRange);
-
- WriteBlockIndex++;
- while (std::filesystem::exists(BlockStore::GetBlockPath(BlocksBasePath, WriteBlockIndex)))
- {
- ++WriteBlockIndex;
- }
- BlockOffset = ChunkOffset;
- BlockSize = 0;
- }
- BlockSize = RoundUp(BlockSize, PayloadAlignment);
- BlockStoreLocation ChunkLocation = {.BlockIndex = WriteBlockIndex, .Offset = ChunkOffset - BlockOffset, .Size = ChunkSize};
- Chunks.push_back({ChunkIndex, ChunkLocation});
- BlockSize = ChunkEnd - BlockOffset;
- }
- if (BlockSize > 0)
- {
- BlockRanges.push_back(
- {.Chunks = std::move(Chunks), .BlockOffset = BlockOffset, .BlockSize = BlockSize, .BlockIndex = WriteBlockIndex});
- }
-
- Stopwatch WriteBlockTimer;
-
- std::reverse(BlockRanges.begin(), BlockRanges.end());
- std::vector<std::uint8_t> Buffer(1 << 28);
- for (size_t Idx = 0; Idx < BlockRanges.size(); ++Idx)
- {
- const BlockData& BlockRange = BlockRanges[Idx];
- if (Idx > 0)
- {
- uint64_t Remaining = BlockRange.BlockOffset + BlockRange.BlockSize;
- uint64_t Completed = BlockOffset + BlockSize - Remaining;
- uint64_t ETA = (WriteBlockTimer.GetElapsedTimeMs() * Remaining) / Completed;
-
- ZEN_INFO("migrating store '{}' {}/{} blocks, remaining {} ({}) ETA: {}",
- BlocksBasePath,
- Idx,
- BlockRanges.size(),
- NiceBytes(BlockRange.BlockOffset + BlockRange.BlockSize),
- NiceBytes(BlockOffset + BlockSize),
- NiceTimeSpanMs(ETA));
- }
-
- std::filesystem::path BlockPath = BlockStore::GetBlockPath(BlocksBasePath, BlockRange.BlockIndex);
- BlockStoreFile ChunkBlock(BlockPath);
- ChunkBlock.Create(BlockRange.BlockSize);
- uint64_t Offset = 0;
- while (Offset < BlockRange.BlockSize)
- {
- uint64_t Size = BlockRange.BlockSize - Offset;
- if (Size > Buffer.size())
- {
- Size = Buffer.size();
- }
- BlockFile.Read(Buffer.data(), Size, BlockRange.BlockOffset + Offset);
- ChunkBlock.Write(Buffer.data(), Size, Offset);
- Offset += Size;
- }
- ChunkBlock.Truncate(Offset);
- ChunkBlock.Flush();
-
- Callback(BlockRange.Chunks);
-
- if (CleanSource)
- {
- BlockFile.SetFileSize(BlockRange.BlockOffset);
- }
- }
- BlockFile.Close();
-
- return true;
-}
-
const char*
BlockStore::GetBlockFileExtension()
{
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
index 54e8cb11c..f8fc41341 100644
--- a/zenstore/cas.cpp
+++ b/zenstore/cas.cpp
@@ -148,7 +148,7 @@ CasImpl::OpenOrCreateManifest()
}
else
{
- ZEN_ERROR("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult));
+ ZEN_WARN("Store manifest validation failed: {:#x}, will generate new manifest to recover", uint32_t(ValidationResult));
}
if (ManifestIsOk)
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index a7fdfa1f5..519478356 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -327,7 +327,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
if (!BadKeys.empty())
{
- ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
+ ZEN_WARN("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_RootDirectory / m_ContainerBaseName);
if (Ctx.RunRecovery())
{
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 23e3f4cd8..79867dcfa 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -669,7 +669,7 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size());
+ ZEN_WARN("file CAS scrubbing: {} bad chunks found", BadHashes.size());
if (Ctx.RunRecovery())
{
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index fe435cdff..db32efb9f 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -123,7 +123,6 @@ public:
typedef std::function<uint64_t()> ClaimDiskReserveCallback;
typedef std::function<void(size_t ChunkIndex, const void* Data, uint64_t Size)> IterateChunksSmallSizeCallback;
typedef std::function<void(size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size)> IterateChunksLargeSizeCallback;
- typedef std::function<void(const MovedChunksArray& MovedChunks)> SplitCallback;
typedef std::function<void(const BlockStoreLocation& Location)> WriteChunkCallback;
void Initialize(const std::filesystem::path& BlocksBasePath,
@@ -151,15 +150,6 @@ public:
const IterateChunksSmallSizeCallback& SmallSizeCallback,
const IterateChunksLargeSizeCallback& LargeSizeCallback);
- static bool Split(const std::vector<BlockStoreLocation>& ChunkLocations,
- const std::filesystem::path& SourceBlockFilePath,
- const std::filesystem::path& BlocksBasePath,
- uint64_t MaxBlockSize,
- uint64_t MaxBlockCount,
- size_t PayloadAlignment,
- bool CleanSource,
- const SplitCallback& Callback);
-
static const char* GetBlockFileExtension();
static std::filesystem::path GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint32_t BlockIndex);