diff options
| author | Zousar Shaker <[email protected]> | 2025-08-08 10:44:24 -0600 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-08 10:44:24 -0600 |
| commit | 4943f66c15e66205edaf0068a73c5d7a3b2f801a (patch) | |
| tree | 1dff0546e0ae7ae31a8cf0f36771639a4e68d19c | |
| parent | precommit (diff) | |
| parent | 5.6.15 (diff) | |
| download | zen-4943f66c15e66205edaf0068a73c5d7a3b2f801a.tar.xz zen-4943f66c15e66205edaf0068a73c5d7a3b2f801a.zip | |
Merge branch 'main' into zs/put-overwrite-policy
| -rw-r--r-- | CHANGELOG.md | 8 | ||||
| -rw-r--r-- | VERSION.txt | 2 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 8 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 3 | ||||
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 475 | ||||
| -rw-r--r-- | src/zenserver/buildstore/httpbuildstore.cpp | 21 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 27 | ||||
| -rw-r--r-- | src/zenserver/zenserver.h | 1 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 782 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/buildstore/buildstore.h | 87 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 42 |
12 files changed, 872 insertions, 588 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 80d0d0ea6..1f1c7a22c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,9 +3,15 @@ - Improvement: Don't set m_DispatchComplete in ParallelWork until after pending work countdown succeeds - Improvement: Safeguard FormatCallstack to not throw exceptions when building the callstack string - Improvement: Limit thread name length when setting it for debugger use -- Improvemnet: Don't allow assert callbacks to throw exception +- Improvement: Don't allow assert callbacks to throw exception - Improvement: When formatting log output for malformed attachments in a package message, allow the string buffer to grow instead of throwing exception +- Improvement: Refactored build store cache to use existing CidStore implementation instead of implementation specific blob storage + - **CAUTION** This will clear any existing cache when updating as the manifest version and storage strategy has changed +- Improvement: If cloud-ddc requests upload of a blob it earlier reported as good to reuse we treat it as a transient error and attempt to retry - Bugfix: Parents were not notified when successfully attaching to an existing server instance +- Bugfix: BuildStorage cache return "true" for metadata existance for all blobs that had payloads regardless of actual existance for metadata +- Bugfix: Builds download - don't query for block metadata if no blocks are required +- Bugfix: Add the referenced attachments correctly when storing inline cache bucket records using batch mode ## 5.6.14 - Improvement: If `zen builds upload` fails to upload metadata for a block with a 404 response (due to race condition from hitting different server) we save and retry metadata upload at end of upload diff --git a/VERSION.txt b/VERSION.txt index db4dd40c5..1c3c44766 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -5.6.14
\ No newline at end of file +5.6.15
\ No newline at end of file diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index cd27daa9e..7cb12cd4c 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -2904,8 +2904,10 @@ namespace { } else { - throw std::runtime_error( - fmt::format("Can not upload requested build blob {} as it was not generated by this upload", RawHash)); + ZEN_CONSOLE( + "Warning: Build blob {} was reported as needed for upload but it was reported as existing at the start of the " + "operation. Treating it as a transient inconsistent issue and will attempt to retry finalization", + RawHash); } } uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; @@ -8656,7 +8658,7 @@ namespace { std::vector<ChunkBlockDescription> UnorderedList; tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; - if (Storage.BuildCacheStorage) + if (Storage.BuildCacheStorage && !BlockRawHashes.empty()) { std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes); UnorderedList.reserve(CacheBlockMetadatas.size()); diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 598ef9314..64cdbf5c4 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -784,7 +784,8 @@ main(int argc, char** argv) std::string_view ThisArg(argv[j]); PassthroughArgV.push_back(std::string(ThisArg)); - const bool NeedsQuotes = (ThisArg.find(' ') != std::string_view::npos); + const bool NeedsQuotes = + (ThisArg.find(' ') != std::string_view::npos) && !(ThisArg.starts_with("\"") && ThisArg.ends_with("\"")); if (NeedsQuotes) { diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 301f93a55..9ccdf3b5e 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -13,6 +13,7 @@ #include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/memoryview.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/string.h> #include <zencore/testutils.h> @@ -22,6 +23,7 @@ #include <zenhttp/httpclient.h> #include <zenhttp/packageformat.h> #include <zenhttp/zenhttp.h> +#include <zenutil/buildstoragecache.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/cacherequests.h> #include <zenutil/chunkrequests.h> @@ -37,6 +39,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> +#include <tsl/robin_set.h> #undef GetObject ZEN_THIRD_PARTY_INCLUDES_END @@ -4157,6 +4160,478 @@ TEST_CASE("workspaces.share") CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound); } +TEST_CASE("buildstore.blobs") +{ + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); }); + + std::string_view Namespace = "ns"sv; + std::string_view Bucket = "bkt"sv; + Oid BuildId = Oid::NewOid(); + + std::vector<IoHash> CompressedBlobsHashes; + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + HttpClient::Response Result = + Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload); + CHECK(Result); + } + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + CHECK(Result); + IoBuffer Payload = Result.ResponsePayload; + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + CHECK(Result); + IoBuffer Payload = Result.ResponsePayload; + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + + for (size_t I = 0; I < 5; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash()); + IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + + HttpClient::Response Result = + Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload); + CHECK(Result); + } + } + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + for (const IoHash& RawHash : CompressedBlobsHashes) + { + HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash), + HttpClient::Accept(ZenContentType::kCompressedBinary)); + CHECK(Result); + IoBuffer Payload = Result.ResponsePayload; + CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary); + IoHash VerifyRawHash; + uint64_t VerifyRawSize; + CompressedBuffer CompressedBlob = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize); + CHECK(CompressedBlob); + CHECK(VerifyRawHash == RawHash); + IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); + CHECK(IoHash::HashBuffer(Decompressed) == RawHash); + } + } +} + +namespace { + CbObject MakeMetadata(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) + { + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, BlobHash); + Writer.BeginObject("values"); + { + for (const auto& V : KeyValues) + { + Writer.AddString(V.first, V.second); + } + } + Writer.EndObject(); // values + return Writer.Save(); + }; + +} // namespace + +TEST_CASE("buildstore.metadata") +{ + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); }); + + std::string_view Namespace = "ns"sv; + std::string_view Bucket = "bkt"sv; + Oid BuildId = Oid::NewOid(); + + std::vector<IoHash> BlobHashes; + std::vector<CbObject> Metadatas; + std::vector<IoHash> MetadataHashes; + + auto GetMetadatas = + [](HttpClient& Client, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, std::vector<IoHash> BlobHashes) { + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/getBlobMetadata", Namespace, Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + CHECK(Result); + + std::vector<CbObject> ResultMetadatas; + + CbPackage ResponsePackage = ParsePackageMessage(Result.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + ResultMetadatas.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + ResultMetadatas.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return ResultMetadatas; + }; + + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + const size_t BlobCount = 5; + + for (size_t I = 0; I < BlobCount; I++) + { + BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); + Metadatas.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); + MetadataHashes.push_back(IoHash::HashBuffer(Metadatas.back().GetBuffer().AsIoBuffer())); + } + + { + CbPackage RequestPackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = Metadatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(Metadatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/putBlobMetadata", Namespace, Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + CHECK(Result); + } + + { + std::vector<CbObject> ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes); + + for (size_t Index = 0; Index < MetadataHashes.size(); Index++) + { + const IoHash& ExpectedHash = MetadataHashes[Index]; + IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer()); + CHECK_EQ(ExpectedHash, Hash); + } + } + } + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/builds/"); + + std::vector<CbObject> ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes); + + for (size_t Index = 0; Index < MetadataHashes.size(); Index++) + { + const IoHash& ExpectedHash = MetadataHashes[Index]; + IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer()); + CHECK_EQ(ExpectedHash, Hash); + } + } +} + +TEST_CASE("buildstore.cache") +{ + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + std::filesystem::path TempDir = TestEnv.CreateNewTestDir(); + auto _ = MakeGuard([&SystemRootPath, &TempDir]() { + DeleteDirectories(SystemRootPath); + DeleteDirectories(TempDir); + }); + + std::string_view Namespace = "ns"sv; + std::string_view Bucket = "bkt"sv; + Oid BuildId = Oid::NewOid(); + + std::vector<IoHash> BlobHashes; + std::vector<CbObject> Metadatas; + std::vector<IoHash> MetadataHashes; + + const size_t BlobCount = 5; + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri()); + + BuildStorageCache::Statistics Stats; + std::unique_ptr<BuildStorageCache> Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false)); + + { + IoHash NoneBlob = IoHash::HashBuffer("data", 4); + std::vector<BuildStorageCache::BlobExistsResult> NoneExists = Cache->BlobsExists(BuildId, std::vector<IoHash>{NoneBlob}); + CHECK(NoneExists.size() == 1); + CHECK(!NoneExists[0].HasBody); + CHECK(!NoneExists[0].HasMetadata); + } + + for (size_t I = 0; I < BlobCount; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + BlobHashes.push_back(CompressedBlob.DecodeRawHash()); + Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed()); + } + + Cache->Flush(500); + Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); + + { + std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount; I++) + { + CHECK(Exists[I].HasBody); + CHECK(!Exists[I].HasMetadata); + } + + std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(0, FetchedMetadatas.size()); + } + + { + for (size_t I = 0; I < BlobCount; I++) + { + IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]); + CHECK(BuildBlob); + CHECK_EQ(BlobHashes[I], + IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer())); + } + } + + { + for (size_t I = 0; I < BlobCount; I++) + { + CbObject Metadata = MakeMetadata(BlobHashes[I], + {{"key", fmt::format("{}", I)}, + {"key_plus_one", fmt::format("{}", I + 1)}, + {"block_hash", fmt::format("{}", BlobHashes[I])}}); + Metadatas.push_back(Metadata); + MetadataHashes.push_back(IoHash::HashBuffer(Metadata.GetBuffer().AsIoBuffer())); + } + Cache->PutBlobMetadatas(BuildId, BlobHashes, Metadatas); + } + + Cache->Flush(500); + Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); + + { + std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount; I++) + { + CHECK(Exists[I].HasBody); + CHECK(Exists[I].HasMetadata); + } + + std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, FetchedMetadatas.size()); + + for (size_t I = 0; I < BlobCount; I++) + { + CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); + } + } + + for (size_t I = 0; I < BlobCount; I++) + { + IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob))); + BlobHashes.push_back(CompressedBlob.DecodeRawHash()); + Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed()); + } + + Cache->Flush(500); + Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false); + + { + std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount * 2; I++) + { + CHECK(Exists[I].HasBody); + CHECK_EQ(I < BlobCount, Exists[I].HasMetadata); + } + + std::vector<CbObject> MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, MetaDatas.size()); + + std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, FetchedMetadatas.size()); + + for (size_t I = 0; I < BlobCount; I++) + { + CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); + } + } + } + + { + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri()); + + BuildStorageCache::Statistics Stats; + std::unique_ptr<BuildStorageCache> Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false)); + + std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes); + CHECK(Exists.size() == BlobHashes.size()); + for (size_t I = 0; I < BlobCount * 2; I++) + { + CHECK(Exists[I].HasBody); + CHECK_EQ(I < BlobCount, Exists[I].HasMetadata); + } + + for (size_t I = 0; I < BlobCount * 2; I++) + { + IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]); + CHECK(BuildBlob); + CHECK_EQ(BlobHashes[I], + IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer())); + } + + std::vector<CbObject> MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, MetaDatas.size()); + + std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes); + CHECK_EQ(BlobCount, FetchedMetadatas.size()); + + for (size_t I = 0; I < BlobCount; I++) + { + CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer())); + } + } +} + # if 0 TEST_CASE("lifetime.owner") { diff --git a/src/zenserver/buildstore/httpbuildstore.cpp b/src/zenserver/buildstore/httpbuildstore.cpp index bcec74ce6..3e1bc4203 100644 --- a/src/zenserver/buildstore/httpbuildstore.cpp +++ b/src/zenserver/buildstore/httpbuildstore.cpp @@ -266,7 +266,7 @@ HttpBuildStoreService::PutMetadataRequest(HttpRouterRequest& Req) BlobsArrayIt++; MetadataArrayIt++; } - m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads); + m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads, &GetSmallWorkerPool(EWorkloadType::Burst)); return ServerRequest.WriteResponse(HttpResponseCode::OK); } @@ -484,7 +484,7 @@ HttpBuildStoreService::BlobsExistsRequest(HttpRouterRequest& Req) ResponseWriter.BeginArray("metadataExists"sv); for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists) { - ResponseWriter.AddBool(BlobExists.HasBody); + ResponseWriter.AddBool(BlobExists.HasMetadata); if (BlobExists.HasMetadata) { m_BuildStoreStats.BlobExistsMetaHitCount++; @@ -529,22 +529,11 @@ HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request) BuildStore::StorageStats StorageStats = m_BuildStore.GetStorageStats(); Cbo << "count" << StorageStats.EntryCount; - Cbo << "bytes" << StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes + StorageStats.MetadataByteCount; + Cbo << "bytes" << StorageStats.BlobBytes + StorageStats.MetadataByteCount; Cbo.BeginObject("blobs"); { - Cbo << "count" << (StorageStats.LargeBlobCount + StorageStats.SmallBlobCount); - Cbo << "bytes" << (StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes); - Cbo.BeginObject("large"); - { - Cbo << "count" << StorageStats.LargeBlobCount; - Cbo << "bytes" << StorageStats.LargeBlobBytes; - } - Cbo.EndObject(); // large - Cbo.BeginObject("small"); - { - Cbo << "count" << StorageStats.SmallBlobCount; - Cbo << "bytes" << StorageStats.SmallBlobBytes; - } + Cbo << "count" << StorageStats.BlobCount; + Cbo << "bytes" << StorageStats.BlobBytes; Cbo.EndObject(); // small } Cbo.EndObject(); // blobs diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 7e3baa997..6c9f2ed21 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -265,10 +265,15 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen if (ServerOptions.BuildStoreConfig.Enabled) { + CidStoreConfiguration BuildCidConfig; + BuildCidConfig.RootDirectory = m_DataRoot / "builds_cas"; + m_BuildCidStore = std::make_unique<CidStore>(m_GcManager); + m_BuildCidStore->Initialize(BuildCidConfig); + BuildStoreConfig BuildsCfg; BuildsCfg.RootDirectory = m_DataRoot / "builds"; BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit; - m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager); + m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager, *m_BuildCidStore); } if (ServerOptions.StructuredCacheConfig.Enabled) @@ -666,6 +671,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) m_StatsReporter.AddProvider(m_CacheStore.Get()); m_StatsReporter.AddProvider(m_CidStore.get()); + m_StatsReporter.AddProvider(m_BuildCidStore.get()); } void @@ -838,6 +844,7 @@ ZenServer::Cleanup() m_BuildStoreService.reset(); m_BuildStore = {}; + m_BuildCidStore.reset(); m_StructuredCacheService.reset(); m_UpstreamService.reset(); @@ -1037,9 +1044,18 @@ ZenServer::ScrubStorage() WorkerThreadPool ThreadPool{1, "Scrub"}; ScrubContext Ctx{ThreadPool}; - m_CidStore->ScrubStorage(Ctx); - m_ProjectStore->ScrubStorage(Ctx); - m_StructuredCacheService->ScrubStorage(Ctx); + + if (m_CidStore) + m_CidStore->ScrubStorage(Ctx); + + if (m_ProjectStore) + m_ProjectStore->ScrubStorage(Ctx); + + if (m_StructuredCacheService) + m_StructuredCacheService->ScrubStorage(Ctx); + + if (m_BuildCidStore) + m_BuildCidStore->ScrubStorage(Ctx); const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); @@ -1061,6 +1077,9 @@ ZenServer::Flush() if (m_ProjectStore) m_ProjectStore->Flush(); + + if (m_BuildCidStore) + m_BuildCidStore->Flush(); } void diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 5cfa04ba1..0dcab6ec4 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -128,6 +128,7 @@ private: Ref<ZenCacheStore> m_CacheStore; std::unique_ptr<OpenProcessCache> m_OpenProcessCache; HttpTestService m_TestService; + std::unique_ptr<CidStore> m_BuildCidStore; std::unique_ptr<BuildStore> m_BuildStore; #if ZEN_WITH_TESTS diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index 20dc55bca..1b2cf036b 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -3,6 +3,7 @@ #include <zenstore/buildstore/buildstore.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory/llm.h> @@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> -# include <zencore/compress.h> # include <zencore/testing.h> # include <zencore/testutils.h> # include <zenutil/workerpools.h> @@ -45,7 +45,7 @@ namespace blobstore::impl { const char* LogExtension = ".slog"; const char* AccessTimeExtension = ".zacs"; - const uint32_t ManifestVersion = (1 << 16) | (0 << 8) | (0); + const uint32_t ManifestVersion = (2 << 16) | (0 << 8) | (0); std::filesystem::path GetManifestPath(const std::filesystem::path& RootDirectory) { @@ -106,13 +106,11 @@ namespace blobstore::impl { } // namespace blobstore::impl -BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) +BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore) : m_Log(logging::Get("builds")) , m_Config(Config) , m_Gc(Gc) -, m_LargeBlobStore(m_Gc) -, m_SmallBlobStore(Gc) -, m_MetadataBlockStore() +, m_BlobStore(BlobStore) { ZEN_TRACE_CPU("BuildStore::BuildStore"); ZEN_MEMSCOPE(GetBuildstoreTag()); @@ -170,75 +168,16 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) ManifestWriter.AddDateTime("createdAt", DateTime::Now()); TemporaryFile::SafeWriteFile(ManifestPath, ManifestWriter.Save().GetBuffer().AsIoBuffer()); } - m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew); - m_SmallBlobStore.Initialize(Config.RootDirectory, - "blob_cas", - m_Config.SmallBlobBlockStoreMaxBlockSize, - m_Config.SmallBlobBlockStoreAlignement, - IsNew); - m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); - - BlockStore::BlockIndexSet KnownBlocks; - for (const BlobEntry& Blob : m_BlobEntries) - { - if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) - { - const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; - KnownBlocks.insert(Metadata.Location.BlockIndex); - } - } - BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); - if (!MissingBlocks.empty()) - { - std::vector<MetadataDiskEntry> MissingMetadatas; - for (auto& It : m_BlobLookup) - { - const IoHash& BlobHash = It.first; - const BlobIndex ReadBlobIndex = It.second; - const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; - if (ReadBlobEntry.Metadata) - { - const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata]; - if (MissingBlocks.contains(MetaData.Location.BlockIndex)) - { - MissingMetadatas.push_back( - MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash}); - MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; - m_MetadataEntries[ReadBlobEntry.Metadata] = {}; - m_BlobEntries[ReadBlobIndex].Metadata = {}; - } - } - } - ZEN_ASSERT(!MissingMetadatas.empty()); - - for (const MetadataDiskEntry& Entry : MissingMetadatas) - { - auto It = m_BlobLookup.find(Entry.BlobHash); - ZEN_ASSERT(It != m_BlobLookup.end()); - - const BlobIndex ReadBlobIndex = It->second; - const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; - if (!ReadBlobEntry.Payload) - { - m_BlobLookup.erase(It); - } - } - m_MetadatalogFile.Append(MissingMetadatas); - CompactState(); - } - m_Gc.AddGcReferencer(*this); m_Gc.AddGcReferenceLocker(*this); - m_Gc.AddGcStorage(this); } catch (const std::exception& Ex) { ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what()); - m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); } @@ -249,7 +188,6 @@ BuildStore::~BuildStore() try { ZEN_TRACE_CPU("BuildStore::~BuildStore"); - m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); Flush(); @@ -280,21 +218,12 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) } } - uint64_t PayloadSize = Payload.GetSize(); - PayloadEntry Entry; - if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize) - { - CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash); - ZEN_UNUSED(Result); - Entry = PayloadEntry(PayloadEntry::kStandalone, PayloadSize); - } - else - { - CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash); - ZEN_UNUSED(Result); - Entry = PayloadEntry(0, PayloadSize); - } + uint64_t PayloadSize = Payload.GetSize(); + CidStore::InsertResult Result = m_BlobStore.AddChunk(Payload, BlobHash); + PayloadEntry Entry = PayloadEntry(0, PayloadSize); + ; + IoHash MetadataHash; { RwLock::ExclusiveLockScope _(m_Lock); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) @@ -310,6 +239,10 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); } + if (Blob.Metadata) + { + MetadataHash = m_MetadataEntries[Blob.Metadata].MetadataHash; + } Blob.LastAccessTime = GcClock::TickCount(); } else @@ -322,6 +255,16 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } + + m_LastAccessTimeUpdateCount++; + if (m_TrackedBlobKeys) + { + m_TrackedBlobKeys->push_back(BlobHash); + if (MetadataHash != IoHash::Zero) + { + m_TrackedBlobKeys->push_back(BlobHash); + } + } } m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); m_LastAccessTimeUpdateCount++; @@ -340,21 +283,9 @@ BuildStore::GetBlob(const IoHash& BlobHash) Blob.LastAccessTime = GcClock::TickCount(); if (Blob.Payload) { - const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload]; - const bool IsStandalone = (Entry.GetFlags() & PayloadEntry::kStandalone) != 0; Lock.ReleaseNow(); + IoBuffer Chunk = m_BlobStore.FindChunkByCid(BlobHash); - IoBuffer Chunk; - if (IsStandalone) - { - ZEN_TRACE_CPU("GetLarge"); - Chunk = m_LargeBlobStore.FindChunk(BlobHash); - } - else - { - ZEN_TRACE_CPU("GetSmall"); - Chunk = m_SmallBlobStore.FindChunk(BlobHash); - } if (Chunk) { Chunk.SetContentType(ZenContentType::kCompressedBinary); @@ -362,7 +293,7 @@ BuildStore::GetBlob(const IoHash& BlobHash) } else { - ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block"); + ZEN_WARN("Inconsistencies in build store, {} is in index but not in blob store", BlobHash); } } } @@ -381,10 +312,10 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes) { if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { - const BlobIndex ExistingBlobIndex = It->second; - BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; - bool HasPayload = !!Blob.Payload; - bool HasMetadata = !!Blob.Metadata; + const BlobIndex ExistingBlobIndex = It->second; + const BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + bool HasPayload = !!Blob.Payload; + bool HasMetadata = !!Blob.Metadata; Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata}); } else @@ -396,20 +327,82 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes) } void -BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas) +BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> Metadatas, WorkerThreadPool* OptionalWorkerPool) { ZEN_TRACE_CPU("BuildStore::PutMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); - size_t WriteBlobIndex = 0; - m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) { + std::vector<IoHash> MetadataHashes; + std::vector<IoBuffer> CompressedMetadataBuffers; + + auto CompressOne = [&BlobHashes, &MetadataHashes, &CompressedMetadataBuffers](const IoBuffer& Buffer, size_t Index) { + if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + if (!CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) + { + throw std::runtime_error( + fmt::format("Invalid compressed buffer provided when storing metadata for blob {}", BlobHashes[Index])); + } + else + { + CompressedMetadataBuffers[Index] = Buffer; + MetadataHashes[Index] = RawHash; + } + } + else + { + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::None); + MetadataHashes[Index] = Compressed.DecodeRawHash(); + CompressedMetadataBuffers[Index] = std::move(Compressed.GetCompressed()).Flatten().AsIoBuffer(); + CompressedMetadataBuffers[Index].SetContentType(ZenContentType::kCompressedBinary); + } + }; + + MetadataHashes.resize(Metadatas.size()); + CompressedMetadataBuffers.resize(Metadatas.size()); + if (OptionalWorkerPool) + { + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + for (size_t Index = 0; Index < Metadatas.size(); Index++) + { + Work.ScheduleWork( + *OptionalWorkerPool, + [Index, &BlobHashes, &Metadatas, &MetadataHashes, &CompressedMetadataBuffers, &CompressOne](std::atomic<bool>&) { + const IoBuffer& Buffer = Metadatas[Index]; + CompressOne(Buffer, Index); + }, + {}); + } + Work.Wait(); + } + else + { + for (size_t Index = 0; Index < Metadatas.size(); Index++) + { + const IoBuffer& Buffer = Metadatas[Index]; + CompressOne(Buffer, Index); + } + } + + std::vector<MetadataDiskEntry> AddedMetadataEntries; + AddedMetadataEntries.reserve(MetadataHashes.size()); + + std::vector<CidStore::InsertResult> InsertResults = m_BlobStore.AddChunks(CompressedMetadataBuffers, MetadataHashes); + ZEN_UNUSED(InsertResults); + { RwLock::ExclusiveLockScope _(m_Lock); - for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++) + for (size_t Index = 0; Index < BlobHashes.size(); Index++) { - const IoBuffer& Data = MetaDatas[WriteBlobIndex]; - const IoHash& BlobHash = BlobHashes[WriteBlobIndex]; - const BlockStoreLocation& Location = Locations[LocationIndex]; + const ZenContentType ContentType = Metadatas[Index].GetContentType(); + const IoHash& BlobHash = BlobHashes[Index]; + const IoHash& MetadataHash = MetadataHashes[Index]; + const IoBuffer& Metadata = CompressedMetadataBuffers[Index]; - MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; + MetadataEntry Entry(MetadataHash, Metadata.GetSize(), ContentType, 0); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { @@ -435,17 +428,16 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } - - m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); - m_LastAccessTimeUpdateCount++; - WriteBlobIndex++; - if (m_TrackedCacheKeys) + if (m_TrackedBlobKeys) { - m_TrackedCacheKeys->insert(BlobHash); + m_TrackedBlobKeys->push_back(BlobHash); + m_TrackedBlobKeys->push_back(MetadataHash); } + AddedMetadataEntries.push_back(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); } - }); + } + m_MetadatalogFile.Append(AddedMetadataEntries); } std::vector<IoBuffer> @@ -453,9 +445,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O { ZEN_TRACE_CPU("BuildStore::GetMetadatas"); ZEN_MEMSCOPE(GetBuildstoreTag()); - std::vector<BlockStoreLocation> MetaLocations; - std::vector<size_t> MetaLocationResultIndexes; - MetaLocations.reserve(BlobHashes.size()); + std::vector<IoHash> MetadataHashes; + std::vector<size_t> MetaLocationResultIndexes; + MetadataHashes.reserve(BlobHashes.size()); MetaLocationResultIndexes.reserve(BlobHashes.size()); tsl::robin_set<uint32_t> ReferencedBlocks; @@ -475,10 +467,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O if (ExistingBlobEntry.Metadata) { const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata]; - MetaLocations.push_back(ExistingMetadataEntry.Location); + MetadataHashes.push_back(ExistingMetadataEntry.MetadataHash); MetaLocationResultIndexes.push_back(Index); - ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex); - ResultContentTypes[Index] = ExistingMetadataEntry.ContentType; + ResultContentTypes[Index] = ExistingMetadataEntry.GetContentType(); } ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount()); m_LastAccessTimeUpdateCount++; @@ -486,100 +477,35 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O } } - auto DoOneBlock = [this](std::span<const BlockStoreLocation> MetaLocations, - std::span<const size_t> MetaLocationResultIndexes, - std::span<const size_t> ChunkIndexes, - std::vector<IoBuffer>& Result) { - if (ChunkIndexes.size() < 4) - { - for (size_t ChunkIndex : ChunkIndexes) + m_BlobStore.IterateChunks( + MetadataHashes, + [this, &BlobHashes, &MetadataHashes, &MetaLocationResultIndexes, &Result](size_t Index, const IoBuffer& Payload) { + if (Payload) { - IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]); - if (Chunk) - { - size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; - Result[ResultIndex] = std::move(Chunk); - } - } - return true; - } - return m_MetadataBlockStore.IterateBlock( - MetaLocations, - ChunkIndexes, - [&MetaLocationResultIndexes, &Result](size_t ChunkIndex, const void* Data, uint64_t Size) { - if (Data != nullptr) + size_t ResultIndex = MetaLocationResultIndexes[Index]; + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer CompressedBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); + if (CompressedBuffer) { - size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; - Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size); - } - return true; - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex]; - Result[ResultIndex] = File.GetChunk(Offset, Size); - return true; - }, - 8u * 1024u); - }; - - if (!MetaLocations.empty()) - { - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - ParallelWork Work(AbortFlag, PauseFlag); - - try - { - m_MetadataBlockStore.IterateChunks( - MetaLocations, - [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock]( - uint32_t BlockIndex, - std::span<const size_t> ChunkIndexes) -> bool { - ZEN_UNUSED(BlockIndex); - if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) + IoBuffer Decompressed = CompressedBuffer.DecompressToComposite().Flatten().AsIoBuffer(); + if (Decompressed) { - return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result); + Result[ResultIndex] = std::move(Decompressed); } else { - ZEN_ASSERT(OptionalWorkerPool != nullptr); - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - Work.ScheduleWork( - *OptionalWorkerPool, - [this, - &Result, - &MetaLocations, - &MetaLocationResultIndexes, - DoOneBlock, - ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - try - { - if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result)) - { - AbortFlag.store(true); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); - } - }); - return !Work.IsAborted(); + ZEN_WARN("Metadata {} for blob {} is malformed (not a compressed binary format)", + MetadataHashes[ResultIndex], + BlobHashes[ResultIndex]); } - }); - } - catch (const std::exception& Ex) - { - AbortFlag.store(true); - ZEN_WARN("Failed iterating block metadata chunks in {}. Reason: '{}'", m_Config.RootDirectory, Ex.what()); - } + } + } + return true; + }, + OptionalWorkerPool, + 8u * 1024u); - Work.Wait(); - } for (size_t Index = 0; Index < Result.size(); Index++) { if (Result[Index]) @@ -600,9 +526,7 @@ BuildStore::Flush() const auto _ = MakeGuard( [&] { ZEN_INFO("Flushed build store at {} in {}", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_LargeBlobStore.Flush(); - m_SmallBlobStore.Flush(); - m_MetadataBlockStore.Flush(false); + m_BlobStore.Flush(); m_PayloadlogFile.Flush(); m_MetadatalogFile.Flush(); @@ -636,22 +560,14 @@ BuildStore::GetStorageStats() const { const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload]; uint64_t Size = Payload.GetSize(); - if ((Payload.GetFlags() & PayloadEntry::kStandalone) != 0) - { - Result.LargeBlobCount++; - Result.LargeBlobBytes += Size; - } - else - { - Result.SmallBlobCount++; - Result.SmallBlobBytes += Size; - } + Result.BlobCount++; + Result.BlobBytes += Size; } if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; Result.MetadataCount++; - Result.MetadataByteCount += Metadata.Location.Size; + Result.MetadataByteCount += Metadata.GetSize(); } } } @@ -882,10 +798,9 @@ BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesy CasLog.Replay( [&](const MetadataDiskEntry& Record) { std::string InvalidEntryReason; - if (Record.Entry.Flags & MetadataEntry::kTombStone) + if (Record.Entry.GetFlags() & MetadataEntry::kTombStone) { // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation - // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end()) { if (!m_BlobEntries[ExistingIt->second].Payload) @@ -1058,7 +973,7 @@ BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string& OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString()); return false; } - if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone)) + if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone)) { OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.GetFlags(), Entry.BlobHash.ToHexString()); return false; @@ -1083,30 +998,20 @@ BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::strin OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString()); return false; } - if (Entry.Entry.Location.Size == 0) + if (Entry.Entry.GetSize() == 0) { - OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size); + OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.GetSize()); return false; } - if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0) - { - OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); - return false; - } - if (Entry.Entry.Flags & MetadataEntry::kTombStone) + if (Entry.Entry.GetFlags() & MetadataEntry::kTombStone) { return true; } - if (Entry.Entry.ContentType == ZenContentType::kCOUNT) + if (Entry.Entry.GetContentType() == ZenContentType::kCOUNT) { OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString()); return false; } - if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0) - { - OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString()); - return false; - } return true; } @@ -1114,31 +1019,76 @@ class BuildStoreGcReferenceChecker : public GcReferenceChecker { public: BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {} + ~BuildStoreGcReferenceChecker() + { + try + { + m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys.reset(); }); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BuildStoreGcReferenceChecker threw exception: '{}'", Ex.what()); + } + } virtual std::string GetGcName(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); } - virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } - - virtual void UpdateLockedState(GcCtx& Ctx) override + virtual void PreCache(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Builds::UpdateLockedState"); - ZEN_MEMSCOPE(GetBuildstoreTag()); + ZEN_TRACE_CPU("Builds::PreCache"); auto Log = [&Ctx]() { return Ctx.Logger; }; - m_References.reserve(m_Store.m_BlobLookup.size()); - for (const auto& It : m_Store.m_BlobLookup) + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: builds [PRECACHE] '{}': found {} references in {}", + m_Store.m_Config.RootDirectory, + m_PrecachedReferences.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys = std::make_unique<std::vector<IoHash>>(); }); + { - const BuildStore::BlobIndex ExistingBlobIndex = It.second; - if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload) + m_PrecachedReferences.reserve(m_Store.m_BlobLookup.size()); + RwLock::SharedLockScope __(m_Store.m_Lock); + for (const auto& It : m_Store.m_BlobLookup) { - m_References.push_back(It.first); + const BuildStore::BlobIndex ExistingBlobIndex = It.second; + const BuildStore::BlobEntry& Entry = m_Store.m_BlobEntries[ExistingBlobIndex]; + if (Entry.Payload) + { + m_PrecachedReferences.push_back(It.first); + } + if (Entry.Metadata) + { + const BuildStore::MetadataEntry& MetadataEntry = m_Store.m_MetadataEntries[Entry.Metadata]; + m_PrecachedReferences.push_back(MetadataEntry.MetadataHash); + } } } - FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References); + FilterReferences(Ctx, fmt::format("buildstore [PRECACHE] '{}'", m_Store.m_Config.RootDirectory), m_PrecachedReferences); + } + + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Builds::UpdateLockedState"); + ZEN_MEMSCOPE(GetBuildstoreTag()); + + auto Log = [&Ctx]() { return Ctx.Logger; }; + + ZEN_ASSERT(m_Store.m_TrackedBlobKeys); + + m_AddedReferences = std::move(*m_Store.m_TrackedBlobKeys); + + FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", m_Store.m_Config.RootDirectory), m_AddedReferences); } virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override @@ -1165,14 +1115,16 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids); + UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences); UsedCount = IoCids.size() - UnusedReferences.size(); return UnusedReferences; } private: BuildStore& m_Store; - std::vector<IoHash> m_References; + std::vector<IoHash> m_PrecachedReferences; + std::vector<IoHash> m_AddedReferences; }; std::string @@ -1184,201 +1136,6 @@ BuildStore::GetGcName(GcCtx& Ctx) return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string()); } -class BuildStoreGcCompator : public GcStoreCompactor -{ - using BlobEntry = BuildStore::BlobEntry; - using PayloadEntry = BuildStore::PayloadEntry; - using MetadataEntry = BuildStore::MetadataEntry; - using MetadataDiskEntry = BuildStore::MetadataDiskEntry; - using BlobIndex = BuildStore::BlobIndex; - using PayloadIndex = BuildStore::PayloadIndex; - using MetadataIndex = BuildStore::MetadataIndex; - -public: - BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {} - - virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override - { - ZEN_UNUSED(ClaimDiskReserveCallback); - ZEN_TRACE_CPU("Builds::CompactStore"); - ZEN_MEMSCOPE(GetBuildstoreTag()); - - auto Log = [&Ctx]() { return Ctx.Logger; }; - - Stopwatch Timer; - const auto _ = MakeGuard([&] { - if (!Ctx.Settings.Verbose) - { - return; - } - ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}", - m_Store.m_Config.RootDirectory, - NiceBytes(Stats.RemovedDisk), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - - const auto __ = MakeGuard([&] { m_Store.Flush(); }); - - if (!m_RemovedBlobs.empty()) - { - if (Ctx.Settings.CollectSmallObjects) - { - m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); }); - auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); }); - - BlockStore::BlockUsageMap BlockUsage; - { - RwLock::SharedLockScope __(m_Store.m_Lock); - - for (auto LookupIt : m_Store.m_BlobLookup) - { - const BlobIndex ReadBlobIndex = LookupIt.second; - const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex]; - - if (ReadBlobEntry.Metadata) - { - const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata]; - - uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex; - uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement); - - if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end()) - { - BlockStore::BlockUsageInfo& Info = BlockUsageIt.value(); - Info.EntryCount++; - Info.DiskUsage += ChunkSize; - } - else - { - BlockUsage.insert_or_assign(BlockIndex, - BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1}); - } - } - } - } - - BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90); - BlockStoreCompactState BlockCompactState; - std::vector<IoHash> BlockCompactStateKeys; - BlockCompactState.IncludeBlocks(BlocksToCompact); - - if (BlocksToCompact.size() > 0) - { - { - RwLock::SharedLockScope ___(m_Store.m_Lock); - for (const auto& Entry : m_Store.m_BlobLookup) - { - BlobIndex Index = Entry.second; - - if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) - { - if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location)) - { - BlockCompactStateKeys.push_back(Entry.first); - } - } - } - } - - if (Ctx.Settings.IsDeleteMode) - { - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks", - m_Store.m_Config.RootDirectory, - BlocksToCompact.size()); - } - - m_Store.m_MetadataBlockStore.CompactBlocks( - BlockCompactState, - m_Store.m_Config.MetadataBlockStoreAlignement, - [&](const BlockStore::MovedChunksArray& MovedArray, - const BlockStore::ChunkIndexArray& ScrubbedArray, - uint64_t FreedDiskSpace) { - std::vector<MetadataDiskEntry> MovedEntries; - MovedEntries.reserve(MovedArray.size()); - RwLock::ExclusiveLockScope _(m_Store.m_Lock); - for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray) - { - size_t ChunkIndex = Moved.first; - const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; - - ZEN_ASSERT(m_Store.m_TrackedCacheKeys); - if (m_Store.m_TrackedCacheKeys->contains(Key)) - { - continue; - } - - if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) - { - const BlobIndex Index = It->second; - - if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) - { - m_Store.m_MetadataEntries[Meta].Location = Moved.second; - MovedEntries.push_back( - MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); - } - } - } - - for (size_t Scrubbed : ScrubbedArray) - { - const IoHash& Key = BlockCompactStateKeys[Scrubbed]; - if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end()) - { - const BlobIndex Index = It->second; - - if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta) - { - MovedEntries.push_back( - MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key}); - MovedEntries.back().Entry.Flags |= MetadataEntry::kTombStone; - m_Store.m_MetadataEntries[Meta] = {}; - m_Store.m_BlobEntries[Index].Metadata = {}; - } - } - } - - m_Store.m_MetadatalogFile.Append(MovedEntries); - - Stats.RemovedDisk += FreedDiskSpace; - if (Ctx.IsCancelledFlag.load()) - { - return false; - } - return true; - }, - ClaimDiskReserveCallback, - fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory)); - } - else - { - if (Ctx.Settings.Verbose) - { - ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks", - m_Store.m_Config.RootDirectory, - BlocksToCompact.size()); - } - } - } - } - } - } - - virtual std::string GetGcName(GcCtx& Ctx) override - { - ZEN_UNUSED(Ctx); - ZEN_MEMSCOPE(GetBuildstoreTag()); - - return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string()); - } - -private: - BuildStore& m_Store; - const std::vector<IoHash> m_RemovedBlobs; -}; - GcStoreCompactor* BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { @@ -1413,10 +1170,9 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) uint64_t BlobSize = 0; }; - bool DiskSizeExceeded = false; - const uint64_t CurrentDiskSize = - m_LargeBlobStore.StorageSize().DiskSize + m_SmallBlobStore.StorageSize().DiskSize + m_MetadataBlockStore.TotalSize(); - if (CurrentDiskSize > m_Config.MaxDiskSpaceLimit) + bool DiskSizeExceeded = false; + const uint64_t CurrentBlobsDiskSize = m_BlobStore.TotalSize().TotalSize; + if ((m_Config.MaxDiskSpaceLimit > 0) && (CurrentBlobsDiskSize > m_Config.MaxDiskSpaceLimit)) { DiskSizeExceeded = true; } @@ -1444,14 +1200,14 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) if (ReadBlobEntry.Metadata) { const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata]; - Size += Metadata.Location.Size; + Size += Metadata.GetSize(); } const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime; if (AccessTick < ExpireTicks) { ExpiredBlobs.push_back(It.first); - ExpiredDataSize += ExpiredDataSize; + ExpiredDataSize += Size; } else if (DiskSizeExceeded) { @@ -1469,7 +1225,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) const uint64_t NewSizeLimit = m_Config.MaxDiskSpaceLimit - (m_Config.MaxDiskSpaceLimit >> 4); // Remove a bit more than just below the limit so we have some space to grow - if ((CurrentDiskSize - ExpiredDataSize) > NewSizeLimit) + if ((CurrentBlobsDiskSize - ExpiredDataSize) > NewSizeLimit) { std::vector<size_t> NonExpiredOrder; NonExpiredOrder.resize(NonExpiredBlobSizeInfos.size()); @@ -1487,7 +1243,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) while (It != NonExpiredOrder.end()) { const SizeInfo& Info = NonExpiredBlobSizeInfos[*It]; - if ((CurrentDiskSize - ExpiredDataSize) < NewSizeLimit) + if ((CurrentBlobsDiskSize - ExpiredDataSize) < NewSizeLimit) { break; } @@ -1539,7 +1295,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) { RemoveMetadatas.push_back( MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob}); - RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; + RemoveMetadatas.back().Entry.AddFlag(MetadataEntry::kTombStone); m_MetadataEntries[ReadBlobEntry.Metadata] = {}; m_BlobEntries[ReadBlobIndex].Metadata = {}; } @@ -1568,7 +1324,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) CompactState(); } - return new BuildStoreGcCompator(*this, std::move(RemovedBlobs)); + return nullptr; } std::vector<GcReferenceChecker*> @@ -1595,21 +1351,6 @@ BuildStore::LockState(GcCtx& Ctx) return Locks; } -void -BuildStore::ScrubStorage(ScrubContext& ScrubCtx) -{ - ZEN_UNUSED(ScrubCtx); - // TODO -} - -GcStorageSize -BuildStore::StorageSize() const -{ - GcStorageSize Result; - Result.DiskSize = m_MetadataBlockStore.TotalSize(); - return Result; -} - /* ___________ __ \__ ___/___ _______/ |_ ______ @@ -1630,8 +1371,10 @@ TEST_CASE("BuildStore.Blobs") std::vector<IoHash> CompressedBlobsHashes; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { @@ -1658,10 +1401,13 @@ TEST_CASE("BuildStore.Blobs") IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer(); CHECK(IoHash::HashBuffer(Decompressed) == RawHash); } + BlobStore.Flush(); } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); @@ -1689,8 +1435,10 @@ TEST_CASE("BuildStore.Blobs") } } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (const IoHash& RawHash : CompressedBlobsHashes) { IoBuffer Payload = Store.GetBlob(RawHash); @@ -1709,7 +1457,7 @@ TEST_CASE("BuildStore.Blobs") } namespace blockstore::testing { - IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) + IoBuffer MakeMetadata(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues) { CbObjectWriter Writer; Writer.AddHash("rawHash"sv, BlobHash); @@ -1740,16 +1488,18 @@ TEST_CASE("BuildStore.Metadata") std::vector<IoHash> BlobHashes; std::vector<IoBuffer> MetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I))); - MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); + MetaPayloads.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}})); MetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(BlobHashes, MetaPayloads); + Store.PutMetadatas(BlobHashes, MetaPayloads, &WorkerPool); std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); @@ -1760,8 +1510,10 @@ TEST_CASE("BuildStore.Metadata") } } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) @@ -1776,8 +1528,10 @@ TEST_CASE("BuildStore.Metadata") } std::vector<IoHash> CompressedBlobsHashes; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); @@ -1805,14 +1559,16 @@ TEST_CASE("BuildStore.Metadata") std::vector<IoBuffer> BlobMetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (const IoHash& BlobHash : CompressedBlobsHashes) { - BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); @@ -1824,8 +1580,10 @@ TEST_CASE("BuildStore.Metadata") } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); @@ -1847,14 +1605,16 @@ TEST_CASE("BuildStore.Metadata") for (const IoHash& BlobHash : CompressedBlobsHashes) { BlobMetaPayloads.push_back( - MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); + MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool); } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); @@ -1886,8 +1646,10 @@ TEST_CASE("BuildStore.GC") std::vector<IoHash> CompressedBlobsHashes; std::vector<IoBuffer> BlobMetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 5; I++) { IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7); @@ -1900,14 +1662,16 @@ TEST_CASE("BuildStore.GC") } for (const IoHash& BlobHash : CompressedBlobsHashes) { - BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr); } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), @@ -1967,8 +1731,10 @@ TEST_CASE("BuildStore.SizeLimit") std::vector<IoHash> CompressedBlobsHashes; std::vector<IoBuffer> BlobMetaPayloads; { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); for (size_t I = 0; I < 64; I++) { IoBuffer Blob = CreateSemiRandomBlob(65537 + I * 7); @@ -1981,10 +1747,10 @@ TEST_CASE("BuildStore.SizeLimit") } for (const IoHash& BlobHash : CompressedBlobsHashes) { - BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); + BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}})); BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject); } - Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); + Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr); { for (size_t I = 0; I < 64; I++) @@ -1997,8 +1763,10 @@ TEST_CASE("BuildStore.SizeLimit") } } { - GcManager Gc; - BuildStore Store(Config, Gc); + GcManager Gc; + CidStore BlobStore(Gc); + BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"}); + BuildStore Store(Config, Gc, BlobStore); { GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1), @@ -2023,7 +1791,7 @@ TEST_CASE("BuildStore.SizeLimit") CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash); } } - CHECK(DeletedBlobs == 50); + CHECK(DeletedBlobs == 53); std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 502ab1508..e0030230f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1408,7 +1408,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 0); + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); ZenCacheValue TemporaryValue; TemporaryValue.Value = Batch->Buffers[Index]; @@ -1444,7 +1444,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 0); + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h index adf48dc26..87b7dd812 100644 --- a/src/zenstore/include/zenstore/buildstore/buildstore.h +++ b/src/zenstore/include/zenstore/buildstore/buildstore.h @@ -6,9 +6,8 @@ #include <zencore/iohash.h> #include <zenstore/accesstime.h> #include <zenstore/caslog.h> +#include <zenstore/cidstore.h> #include <zenstore/gc.h> -#include "../compactcas.h" -#include "../filecas.h" ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> @@ -19,18 +18,13 @@ namespace zen { struct BuildStoreConfig { std::filesystem::path RootDirectory; - uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024; - uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024; - uint32_t SmallBlobBlockStoreAlignement = 16; - uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024; - uint32_t MetadataBlockStoreAlignement = 8; - uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB + uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB }; -class BuildStore : public GcReferencer, public GcReferenceLocker, public GcStorage +class BuildStore : public GcReferencer, public GcReferenceLocker { public: - explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc); + explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore); virtual ~BuildStore(); void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload); @@ -44,7 +38,7 @@ public: std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes); - void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas); + void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas, WorkerThreadPool* OptionalWorkerPool); std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool); void Flush(); @@ -52,10 +46,8 @@ public: struct StorageStats { uint64_t EntryCount = 0; - uint64_t LargeBlobCount = 0; - uint64_t LargeBlobBytes = 0; - uint64_t SmallBlobCount = 0; - uint64_t SmallBlobBytes = 0; + uint64_t BlobCount = 0; + uint64_t BlobBytes = 0; uint64_t MetadataCount = 0; uint64_t MetadataByteCount = 0; }; @@ -86,23 +78,18 @@ private: //////// GcReferenceLocker virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; - //////// GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual GcStorageSize StorageSize() const override; - #pragma pack(push) #pragma pack(1) struct PayloadEntry { PayloadEntry() {} - PayloadEntry(uint64_t Flags, uint64_t Size) + PayloadEntry(uint8_t Flags, uint64_t Size) { ZEN_ASSERT((Size & 0x00ffffffffffffffu) == Size); - ZEN_ASSERT((Flags & (kTombStone | kStandalone)) == Flags); + ZEN_ASSERT((Flags & (kTombStone)) == Flags); FlagsAndSize = (Size << 8) | Flags; } - static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value - static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value uint64_t FlagsAndSize = 0; uint64_t GetSize() const { return FlagsAndSize >> 8; } @@ -126,27 +113,47 @@ private: struct MetadataEntry { - BlockStoreLocation Location; // 12 bytes + IoHash MetadataHash; // 20 bytes + + MetadataEntry() {} - ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte - static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value - uint8_t Flags = 0; // 1 byte + MetadataEntry(const IoHash& Hash, uint64_t Size, ZenContentType ContentType, uint8_t Flags) + { + ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size); + ZEN_ASSERT((Flags & kTombStone) == Flags); + FlagsContentTypeAndSize = (Size << 16) | ((uint64_t)ContentType << 8) | Flags; + MetadataHash = Hash; + } + + static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value + + uint64_t GetSize() const { return FlagsContentTypeAndSize >> 16; } + void SetSize(uint64_t Size) + { + ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size); + FlagsContentTypeAndSize = (Size << 16) | (FlagsContentTypeAndSize & 0xffff); + } + + uint8_t GetFlags() const { return uint8_t(FlagsContentTypeAndSize & 0xff); } + void AddFlag(uint8_t Flag) { FlagsContentTypeAndSize |= Flag; } + void SetFlags(uint8_t Flags) { FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffffff00u) | Flags; } + + ZenContentType GetContentType() const { return ZenContentType((FlagsContentTypeAndSize >> 8) & 0xff); } + void SetContentType(ZenContentType ContentType) + { + FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffff00ffu) | (uint16_t(ContentType) << 8); + } - uint8_t Reserved1 = 0; - uint8_t Reserved2 = 0; + uint64_t FlagsContentTypeAndSize = ((uint64_t)ZenContentType::kCOUNT << 8); }; - static_assert(sizeof(MetadataEntry) == 16); + static_assert(sizeof(MetadataEntry) == 28); struct MetadataDiskEntry { - MetadataEntry Entry; // 16 bytes + MetadataEntry Entry; // 28 bytes IoHash BlobHash; // 20 bytes - uint8_t Reserved1 = 0; - uint8_t Reserved2 = 0; - uint8_t Reserved3 = 0; - uint8_t Reserved4 = 0; }; - static_assert(sizeof(MetadataDiskEntry) == 40); + static_assert(sizeof(MetadataDiskEntry) == 48); #pragma pack(pop) @@ -206,17 +213,15 @@ private: std::vector<BlobEntry> m_BlobEntries; tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup; - FileCasStrategy m_LargeBlobStore; - CasContainerStrategy m_SmallBlobStore; - BlockStore m_MetadataBlockStore; + CidStore& m_BlobStore; TCasLogFile<PayloadDiskEntry> m_PayloadlogFile; TCasLogFile<MetadataDiskEntry> m_MetadatalogFile; uint64_t m_BlobLogFlushPosition = 0; uint64_t m_MetaLogFlushPosition = 0; - std::unique_ptr<HashSet> m_TrackedCacheKeys; - std::atomic<uint64_t> m_LastAccessTimeUpdateCount; + std::unique_ptr<std::vector<IoHash>> m_TrackedBlobKeys; + std::atomic<uint64_t> m_LastAccessTimeUpdateCount; friend class BuildStoreGcReferenceChecker; friend class BuildStoreGcReferencePruner; diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index 1edca5050..a571d1d11 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -35,17 +35,19 @@ ParallelWork::~ParallelWork() m_PendingWork.CountDown(); m_DispatchComplete = true; } - m_PendingWork.Wait(); - ptrdiff_t RemainingWork = m_PendingWork.Remaining(); + const bool WaitSucceeded = m_PendingWork.Wait(); + const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); + if (!WaitSucceeded) + { + ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork); + } if (RemainingWork != 0) { void* Frames[8]; uint32_t FrameCount = GetCallstack(2, 8, Frames); CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); - ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}", - RemainingWork, - CallstackToString(Callstack, " ")); - FreeCallstack(Callstack); + auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); }); + ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork); uint32_t WaitedMs = 0; while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) @@ -53,20 +55,27 @@ ParallelWork::~ParallelWork() Sleep(50); WaitedMs += 50; } - RemainingWork = m_PendingWork.Remaining(); - if (RemainingWork != 0) + ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining(); + if (RemainingWorkAfterSafetyWait != 0) { - ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork) + ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}", + RemainingWork, + RemainingWorkAfterSafetyWait, + NiceLatencyNs(WaitedMs * 1000000u), + CallstackToString(Callstack, " ")) } else { - ZEN_INFO("ParallelWork destructor safety wait succeeded"); + ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}", + RemainingWork, + NiceLatencyNs(WaitedMs * 1000000u), + CallstackToString(Callstack, " ")); } } } catch (const std::exception& Ex) { - ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what()); + ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what()); } } @@ -103,7 +112,16 @@ ParallelWork::Wait() m_PendingWork.CountDown(); m_DispatchComplete = true; - m_PendingWork.Wait(); + const bool WaitSucceeded = m_PendingWork.Wait(); + const ptrdiff_t RemainingWork = m_PendingWork.Remaining(); + if (!WaitSucceeded) + { + ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork); + } + else if (RemainingWork != 0) + { + ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork); + } RethrowErrors(); } |