aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZousar Shaker <[email protected]>2025-08-08 10:44:24 -0600
committerGitHub Enterprise <[email protected]>2025-08-08 10:44:24 -0600
commit4943f66c15e66205edaf0068a73c5d7a3b2f801a (patch)
tree1dff0546e0ae7ae31a8cf0f36771639a4e68d19c
parentprecommit (diff)
parent5.6.15 (diff)
downloadzen-4943f66c15e66205edaf0068a73c5d7a3b2f801a.tar.xz
zen-4943f66c15e66205edaf0068a73c5d7a3b2f801a.zip
Merge branch 'main' into zs/put-overwrite-policy
-rw-r--r--CHANGELOG.md8
-rw-r--r--VERSION.txt2
-rw-r--r--src/zen/cmds/builds_cmd.cpp8
-rw-r--r--src/zen/zen.cpp3
-rw-r--r--src/zenserver-test/zenserver-test.cpp475
-rw-r--r--src/zenserver/buildstore/httpbuildstore.cpp21
-rw-r--r--src/zenserver/zenserver.cpp27
-rw-r--r--src/zenserver/zenserver.h1
-rw-r--r--src/zenstore/buildstore/buildstore.cpp782
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp4
-rw-r--r--src/zenstore/include/zenstore/buildstore/buildstore.h87
-rw-r--r--src/zenutil/parallelwork.cpp42
12 files changed, 872 insertions, 588 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 80d0d0ea6..1f1c7a22c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,9 +3,15 @@
- Improvement: Don't set m_DispatchComplete in ParallelWork until after pending work countdown succeeds
- Improvement: Safeguard FormatCallstack to not throw exceptions when building the callstack string
- Improvement: Limit thread name length when setting it for debugger use
-- Improvemnet: Don't allow assert callbacks to throw exception
+- Improvement: Don't allow assert callbacks to throw exception
- Improvement: When formatting log output for malformed attachments in a package message, allow the string buffer to grow instead of throwing exception
+- Improvement: Refactored build store cache to use existing CidStore implementation instead of implementation specific blob storage
+ - **CAUTION** This will clear any existing cache when updating as the manifest version and storage strategy has changed
+- Improvement: If cloud-ddc requests upload of a blob it earlier reported as good to reuse we treat it as a transient error and attempt to retry
- Bugfix: Parents were not notified when successfully attaching to an existing server instance
+- Bugfix: BuildStorage cache return "true" for metadata existance for all blobs that had payloads regardless of actual existance for metadata
+- Bugfix: Builds download - don't query for block metadata if no blocks are required
+- Bugfix: Add the referenced attachments correctly when storing inline cache bucket records using batch mode
## 5.6.14
- Improvement: If `zen builds upload` fails to upload metadata for a block with a 404 response (due to race condition from hitting different server) we save and retry metadata upload at end of upload
diff --git a/VERSION.txt b/VERSION.txt
index db4dd40c5..1c3c44766 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-5.6.14 \ No newline at end of file
+5.6.15 \ No newline at end of file
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index cd27daa9e..7cb12cd4c 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -2904,8 +2904,10 @@ namespace {
}
else
{
- throw std::runtime_error(
- fmt::format("Can not upload requested build blob {} as it was not generated by this upload", RawHash));
+ ZEN_CONSOLE(
+ "Warning: Build blob {} was reported as needed for upload but it was reported as existing at the start of the "
+ "operation. Treating it as a transient inconsistent issue and will attempt to retry finalization",
+ RawHash);
}
}
uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize;
@@ -8656,7 +8658,7 @@ namespace {
std::vector<ChunkBlockDescription> UnorderedList;
tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
- if (Storage.BuildCacheStorage)
+ if (Storage.BuildCacheStorage && !BlockRawHashes.empty())
{
std::vector<CbObject> CacheBlockMetadatas = Storage.BuildCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes);
UnorderedList.reserve(CacheBlockMetadatas.size());
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 598ef9314..64cdbf5c4 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -784,7 +784,8 @@ main(int argc, char** argv)
std::string_view ThisArg(argv[j]);
PassthroughArgV.push_back(std::string(ThisArg));
- const bool NeedsQuotes = (ThisArg.find(' ') != std::string_view::npos);
+ const bool NeedsQuotes =
+ (ThisArg.find(' ') != std::string_view::npos) && !(ThisArg.starts_with("\"") && ThisArg.ends_with("\""));
if (NeedsQuotes)
{
diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp
index 301f93a55..9ccdf3b5e 100644
--- a/src/zenserver-test/zenserver-test.cpp
+++ b/src/zenserver-test/zenserver-test.cpp
@@ -13,6 +13,7 @@
#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/memoryview.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/testutils.h>
@@ -22,6 +23,7 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
#include <zenhttp/zenhttp.h>
+#include <zenutil/buildstoragecache.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/chunkrequests.h>
@@ -37,6 +39,7 @@
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
+#include <tsl/robin_set.h>
#undef GetObject
ZEN_THIRD_PARTY_INCLUDES_END
@@ -4157,6 +4160,478 @@ TEST_CASE("workspaces.share")
CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound);
}
+TEST_CASE("buildstore.blobs")
+{
+ std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir();
+ auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); });
+
+ std::string_view Namespace = "ns"sv;
+ std::string_view Bucket = "bkt"sv;
+ Oid BuildId = Oid::NewOid();
+
+ std::vector<IoHash> CompressedBlobsHashes;
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ HttpClient::Response Result =
+ Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload);
+ CHECK(Result);
+ }
+
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ CHECK(Result);
+ IoBuffer Payload = Result.ResponsePayload;
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+ }
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ CHECK(Result);
+ IoBuffer Payload = Result.ResponsePayload;
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+
+ for (size_t I = 0; I < 5; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(5713 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ CompressedBlobsHashes.push_back(CompressedBlob.DecodeRawHash());
+ IoBuffer Payload = std::move(CompressedBlob).GetCompressed().Flatten().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+
+ HttpClient::Response Result =
+ Client.Put(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, CompressedBlobsHashes.back()), Payload);
+ CHECK(Result);
+ }
+ }
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ for (const IoHash& RawHash : CompressedBlobsHashes)
+ {
+ HttpClient::Response Result = Client.Get(fmt::format("{}/{}/{}/blobs/{}", Namespace, Bucket, BuildId, RawHash),
+ HttpClient::Accept(ZenContentType::kCompressedBinary));
+ CHECK(Result);
+ IoBuffer Payload = Result.ResponsePayload;
+ CHECK(Payload.GetContentType() == ZenContentType::kCompressedBinary);
+ IoHash VerifyRawHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::FromCompressed(SharedBuffer(std::move(Payload)), VerifyRawHash, VerifyRawSize);
+ CHECK(CompressedBlob);
+ CHECK(VerifyRawHash == RawHash);
+ IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
+ CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
+ }
+ }
+}
+
+namespace {
+ CbObject MakeMetadata(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues)
+ {
+ CbObjectWriter Writer;
+ Writer.AddHash("rawHash"sv, BlobHash);
+ Writer.BeginObject("values");
+ {
+ for (const auto& V : KeyValues)
+ {
+ Writer.AddString(V.first, V.second);
+ }
+ }
+ Writer.EndObject(); // values
+ return Writer.Save();
+ };
+
+} // namespace
+
+TEST_CASE("buildstore.metadata")
+{
+ std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir();
+ auto _ = MakeGuard([&SystemRootPath]() { DeleteDirectories(SystemRootPath); });
+
+ std::string_view Namespace = "ns"sv;
+ std::string_view Bucket = "bkt"sv;
+ Oid BuildId = Oid::NewOid();
+
+ std::vector<IoHash> BlobHashes;
+ std::vector<CbObject> Metadatas;
+ std::vector<IoHash> MetadataHashes;
+
+ auto GetMetadatas =
+ [](HttpClient& Client, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, std::vector<IoHash> BlobHashes) {
+ CbObjectWriter Request;
+
+ Request.BeginArray("blobHashes"sv);
+ for (const IoHash& BlobHash : BlobHashes)
+ {
+ Request.AddHash(BlobHash);
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/getBlobMetadata", Namespace, Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ CHECK(Result);
+
+ std::vector<CbObject> ResultMetadatas;
+
+ CbPackage ResponsePackage = ParsePackageMessage(Result.ResponsePayload);
+ CbObject ResponseObject = ResponsePackage.GetObject();
+
+ CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView();
+ CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView();
+ ResultMetadatas.reserve(MetadatasArray.Num());
+ auto BlobHashesIt = BlobHashes.begin();
+ auto BlobHashArrayIt = begin(BlobHashArray);
+ auto MetadataArrayIt = begin(MetadatasArray);
+ while (MetadataArrayIt != end(MetadatasArray))
+ {
+ const IoHash BlobHash = (*BlobHashArrayIt).AsHash();
+ while (BlobHash != *BlobHashesIt)
+ {
+ ZEN_ASSERT(BlobHashesIt != BlobHashes.end());
+ BlobHashesIt++;
+ }
+
+ ZEN_ASSERT(BlobHash == *BlobHashesIt);
+
+ const IoHash MetaHash = (*MetadataArrayIt).AsAttachment();
+ const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash);
+ ZEN_ASSERT(MetaAttachment);
+
+ CbObject Metadata = MetaAttachment->AsObject();
+ ResultMetadatas.emplace_back(std::move(Metadata));
+
+ BlobHashArrayIt++;
+ MetadataArrayIt++;
+ BlobHashesIt++;
+ }
+ return ResultMetadatas;
+ };
+
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ const size_t BlobCount = 5;
+
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I)));
+ Metadatas.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}}));
+ MetadataHashes.push_back(IoHash::HashBuffer(Metadatas.back().GetBuffer().AsIoBuffer()));
+ }
+
+ {
+ CbPackage RequestPackage;
+ std::vector<CbAttachment> Attachments;
+ tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes;
+ Attachments.reserve(BlobCount);
+ AttachmentHashes.reserve(BlobCount);
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.BeginArray("blobHashes");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++)
+ {
+ RequestWriter.AddHash(BlobHashes[BlockHashIndex]);
+ }
+ RequestWriter.EndArray(); // blobHashes
+
+ RequestWriter.BeginArray("metadatas");
+ for (size_t BlockHashIndex = 0; BlockHashIndex < BlobHashes.size(); BlockHashIndex++)
+ {
+ const IoHash ObjectHash = Metadatas[BlockHashIndex].GetHash();
+ RequestWriter.AddBinaryAttachment(ObjectHash);
+ if (!AttachmentHashes.contains(ObjectHash))
+ {
+ Attachments.push_back(CbAttachment(Metadatas[BlockHashIndex], ObjectHash));
+ AttachmentHashes.insert(ObjectHash);
+ }
+ }
+
+ RequestWriter.EndArray(); // metadatas
+
+ RequestPackage.SetObject(RequestWriter.Save());
+ }
+ RequestPackage.AddAttachments(Attachments);
+
+ CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage);
+
+ HttpClient::Response Result = Client.Post(fmt::format("{}/{}/{}/blobs/putBlobMetadata", Namespace, Bucket, BuildId),
+ RpcRequestBuffer,
+ ZenContentType::kCbPackage);
+ CHECK(Result);
+ }
+
+ {
+ std::vector<CbObject> ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes);
+
+ for (size_t Index = 0; Index < MetadataHashes.size(); Index++)
+ {
+ const IoHash& ExpectedHash = MetadataHashes[Index];
+ IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer());
+ CHECK_EQ(ExpectedHash, Hash);
+ }
+ }
+ }
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/builds/");
+
+ std::vector<CbObject> ResultMetadatas = GetMetadatas(Client, Namespace, Bucket, BuildId, BlobHashes);
+
+ for (size_t Index = 0; Index < MetadataHashes.size(); Index++)
+ {
+ const IoHash& ExpectedHash = MetadataHashes[Index];
+ IoHash Hash = IoHash::HashBuffer(ResultMetadatas[Index].GetBuffer().AsIoBuffer());
+ CHECK_EQ(ExpectedHash, Hash);
+ }
+ }
+}
+
+TEST_CASE("buildstore.cache")
+{
+ std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir();
+ std::filesystem::path TempDir = TestEnv.CreateNewTestDir();
+ auto _ = MakeGuard([&SystemRootPath, &TempDir]() {
+ DeleteDirectories(SystemRootPath);
+ DeleteDirectories(TempDir);
+ });
+
+ std::string_view Namespace = "ns"sv;
+ std::string_view Bucket = "bkt"sv;
+ Oid BuildId = Oid::NewOid();
+
+ std::vector<IoHash> BlobHashes;
+ std::vector<CbObject> Metadatas;
+ std::vector<IoHash> MetadataHashes;
+
+ const size_t BlobCount = 5;
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri());
+
+ BuildStorageCache::Statistics Stats;
+ std::unique_ptr<BuildStorageCache> Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false));
+
+ {
+ IoHash NoneBlob = IoHash::HashBuffer("data", 4);
+ std::vector<BuildStorageCache::BlobExistsResult> NoneExists = Cache->BlobsExists(BuildId, std::vector<IoHash>{NoneBlob});
+ CHECK(NoneExists.size() == 1);
+ CHECK(!NoneExists[0].HasBody);
+ CHECK(!NoneExists[0].HasMetadata);
+ }
+
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ BlobHashes.push_back(CompressedBlob.DecodeRawHash());
+ Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed());
+ }
+
+ Cache->Flush(500);
+ Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false);
+
+ {
+ std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
+ CHECK(Exists.size() == BlobHashes.size());
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ CHECK(Exists[I].HasBody);
+ CHECK(!Exists[I].HasMetadata);
+ }
+
+ std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
+ CHECK_EQ(0, FetchedMetadatas.size());
+ }
+
+ {
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]);
+ CHECK(BuildBlob);
+ CHECK_EQ(BlobHashes[I],
+ IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer()));
+ }
+ }
+
+ {
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ CbObject Metadata = MakeMetadata(BlobHashes[I],
+ {{"key", fmt::format("{}", I)},
+ {"key_plus_one", fmt::format("{}", I + 1)},
+ {"block_hash", fmt::format("{}", BlobHashes[I])}});
+ Metadatas.push_back(Metadata);
+ MetadataHashes.push_back(IoHash::HashBuffer(Metadata.GetBuffer().AsIoBuffer()));
+ }
+ Cache->PutBlobMetadatas(BuildId, BlobHashes, Metadatas);
+ }
+
+ Cache->Flush(500);
+ Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false);
+
+ {
+ std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
+ CHECK(Exists.size() == BlobHashes.size());
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ CHECK(Exists[I].HasBody);
+ CHECK(Exists[I].HasMetadata);
+ }
+
+ std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
+ CHECK_EQ(BlobCount, FetchedMetadatas.size());
+
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer()));
+ }
+ }
+
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
+ CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(Blob)));
+ BlobHashes.push_back(CompressedBlob.DecodeRawHash());
+ Cache->PutBuildBlob(BuildId, BlobHashes.back(), ZenContentType::kCompressedBinary, CompressedBlob.GetCompressed());
+ }
+
+ Cache->Flush(500);
+ Cache = CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false);
+
+ {
+ std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
+ CHECK(Exists.size() == BlobHashes.size());
+ for (size_t I = 0; I < BlobCount * 2; I++)
+ {
+ CHECK(Exists[I].HasBody);
+ CHECK_EQ(I < BlobCount, Exists[I].HasMetadata);
+ }
+
+ std::vector<CbObject> MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
+ CHECK_EQ(BlobCount, MetaDatas.size());
+
+ std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
+ CHECK_EQ(BlobCount, FetchedMetadatas.size());
+
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer()));
+ }
+ }
+ }
+
+ {
+ ZenServerInstance Instance(TestEnv);
+
+ const uint16_t PortNumber =
+ Instance.SpawnServerAndWaitUntilReady(fmt::format("--buildstore-enabled --system-dir {}", SystemRootPath));
+ CHECK(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri());
+
+ BuildStorageCache::Statistics Stats;
+ std::unique_ptr<BuildStorageCache> Cache(CreateZenBuildStorageCache(Client, Stats, Namespace, Bucket, TempDir, false));
+
+ std::vector<BuildStorageCache::BlobExistsResult> Exists = Cache->BlobsExists(BuildId, BlobHashes);
+ CHECK(Exists.size() == BlobHashes.size());
+ for (size_t I = 0; I < BlobCount * 2; I++)
+ {
+ CHECK(Exists[I].HasBody);
+ CHECK_EQ(I < BlobCount, Exists[I].HasMetadata);
+ }
+
+ for (size_t I = 0; I < BlobCount * 2; I++)
+ {
+ IoBuffer BuildBlob = Cache->GetBuildBlob(BuildId, BlobHashes[I]);
+ CHECK(BuildBlob);
+ CHECK_EQ(BlobHashes[I],
+ IoHash::HashBuffer(CompressedBuffer::FromCompressedNoValidate(std::move(BuildBlob)).Decompress().AsIoBuffer()));
+ }
+
+ std::vector<CbObject> MetaDatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
+ CHECK_EQ(BlobCount, MetaDatas.size());
+
+ std::vector<CbObject> FetchedMetadatas = Cache->GetBlobMetadatas(BuildId, BlobHashes);
+ CHECK_EQ(BlobCount, FetchedMetadatas.size());
+
+ for (size_t I = 0; I < BlobCount; I++)
+ {
+ CHECK_EQ(MetadataHashes[I], IoHash::HashBuffer(FetchedMetadatas[I].GetBuffer().AsIoBuffer()));
+ }
+ }
+}
+
# if 0
TEST_CASE("lifetime.owner")
{
diff --git a/src/zenserver/buildstore/httpbuildstore.cpp b/src/zenserver/buildstore/httpbuildstore.cpp
index bcec74ce6..3e1bc4203 100644
--- a/src/zenserver/buildstore/httpbuildstore.cpp
+++ b/src/zenserver/buildstore/httpbuildstore.cpp
@@ -266,7 +266,7 @@ HttpBuildStoreService::PutMetadataRequest(HttpRouterRequest& Req)
BlobsArrayIt++;
MetadataArrayIt++;
}
- m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads);
+ m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads, &GetSmallWorkerPool(EWorkloadType::Burst));
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -484,7 +484,7 @@ HttpBuildStoreService::BlobsExistsRequest(HttpRouterRequest& Req)
ResponseWriter.BeginArray("metadataExists"sv);
for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists)
{
- ResponseWriter.AddBool(BlobExists.HasBody);
+ ResponseWriter.AddBool(BlobExists.HasMetadata);
if (BlobExists.HasMetadata)
{
m_BuildStoreStats.BlobExistsMetaHitCount++;
@@ -529,22 +529,11 @@ HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request)
BuildStore::StorageStats StorageStats = m_BuildStore.GetStorageStats();
Cbo << "count" << StorageStats.EntryCount;
- Cbo << "bytes" << StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes + StorageStats.MetadataByteCount;
+ Cbo << "bytes" << StorageStats.BlobBytes + StorageStats.MetadataByteCount;
Cbo.BeginObject("blobs");
{
- Cbo << "count" << (StorageStats.LargeBlobCount + StorageStats.SmallBlobCount);
- Cbo << "bytes" << (StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes);
- Cbo.BeginObject("large");
- {
- Cbo << "count" << StorageStats.LargeBlobCount;
- Cbo << "bytes" << StorageStats.LargeBlobBytes;
- }
- Cbo.EndObject(); // large
- Cbo.BeginObject("small");
- {
- Cbo << "count" << StorageStats.SmallBlobCount;
- Cbo << "bytes" << StorageStats.SmallBlobBytes;
- }
+ Cbo << "count" << StorageStats.BlobCount;
+ Cbo << "bytes" << StorageStats.BlobBytes;
Cbo.EndObject(); // small
}
Cbo.EndObject(); // blobs
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 7e3baa997..6c9f2ed21 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -265,10 +265,15 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
if (ServerOptions.BuildStoreConfig.Enabled)
{
+ CidStoreConfiguration BuildCidConfig;
+ BuildCidConfig.RootDirectory = m_DataRoot / "builds_cas";
+ m_BuildCidStore = std::make_unique<CidStore>(m_GcManager);
+ m_BuildCidStore->Initialize(BuildCidConfig);
+
BuildStoreConfig BuildsCfg;
BuildsCfg.RootDirectory = m_DataRoot / "builds";
BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit;
- m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager);
+ m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager, *m_BuildCidStore);
}
if (ServerOptions.StructuredCacheConfig.Enabled)
@@ -666,6 +671,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
m_StatsReporter.AddProvider(m_CacheStore.Get());
m_StatsReporter.AddProvider(m_CidStore.get());
+ m_StatsReporter.AddProvider(m_BuildCidStore.get());
}
void
@@ -838,6 +844,7 @@ ZenServer::Cleanup()
m_BuildStoreService.reset();
m_BuildStore = {};
+ m_BuildCidStore.reset();
m_StructuredCacheService.reset();
m_UpstreamService.reset();
@@ -1037,9 +1044,18 @@ ZenServer::ScrubStorage()
WorkerThreadPool ThreadPool{1, "Scrub"};
ScrubContext Ctx{ThreadPool};
- m_CidStore->ScrubStorage(Ctx);
- m_ProjectStore->ScrubStorage(Ctx);
- m_StructuredCacheService->ScrubStorage(Ctx);
+
+ if (m_CidStore)
+ m_CidStore->ScrubStorage(Ctx);
+
+ if (m_ProjectStore)
+ m_ProjectStore->ScrubStorage(Ctx);
+
+ if (m_StructuredCacheService)
+ m_StructuredCacheService->ScrubStorage(Ctx);
+
+ if (m_BuildCidStore)
+ m_BuildCidStore->ScrubStorage(Ctx);
const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
@@ -1061,6 +1077,9 @@ ZenServer::Flush()
if (m_ProjectStore)
m_ProjectStore->Flush();
+
+ if (m_BuildCidStore)
+ m_BuildCidStore->Flush();
}
void
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index 5cfa04ba1..0dcab6ec4 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -128,6 +128,7 @@ private:
Ref<ZenCacheStore> m_CacheStore;
std::unique_ptr<OpenProcessCache> m_OpenProcessCache;
HttpTestService m_TestService;
+ std::unique_ptr<CidStore> m_BuildCidStore;
std::unique_ptr<BuildStore> m_BuildStore;
#if ZEN_WITH_TESTS
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index 20dc55bca..1b2cf036b 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -3,6 +3,7 @@
#include <zenstore/buildstore/buildstore.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory/llm.h>
@@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# include <zencore/compactbinarybuilder.h>
-# include <zencore/compress.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
# include <zenutil/workerpools.h>
@@ -45,7 +45,7 @@ namespace blobstore::impl {
const char* LogExtension = ".slog";
const char* AccessTimeExtension = ".zacs";
- const uint32_t ManifestVersion = (1 << 16) | (0 << 8) | (0);
+ const uint32_t ManifestVersion = (2 << 16) | (0 << 8) | (0);
std::filesystem::path GetManifestPath(const std::filesystem::path& RootDirectory)
{
@@ -106,13 +106,11 @@ namespace blobstore::impl {
} // namespace blobstore::impl
-BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
+BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore)
: m_Log(logging::Get("builds"))
, m_Config(Config)
, m_Gc(Gc)
-, m_LargeBlobStore(m_Gc)
-, m_SmallBlobStore(Gc)
-, m_MetadataBlockStore()
+, m_BlobStore(BlobStore)
{
ZEN_TRACE_CPU("BuildStore::BuildStore");
ZEN_MEMSCOPE(GetBuildstoreTag());
@@ -170,75 +168,16 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc)
ManifestWriter.AddDateTime("createdAt", DateTime::Now());
TemporaryFile::SafeWriteFile(ManifestPath, ManifestWriter.Save().GetBuffer().AsIoBuffer());
}
- m_LargeBlobStore.Initialize(Config.RootDirectory / "file_cas", IsNew);
- m_SmallBlobStore.Initialize(Config.RootDirectory,
- "blob_cas",
- m_Config.SmallBlobBlockStoreMaxBlockSize,
- m_Config.SmallBlobBlockStoreAlignement,
- IsNew);
- m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20);
-
- BlockStore::BlockIndexSet KnownBlocks;
- for (const BlobEntry& Blob : m_BlobEntries)
- {
- if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex)
- {
- const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex];
- KnownBlocks.insert(Metadata.Location.BlockIndex);
- }
- }
- BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks);
m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite);
m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite);
- if (!MissingBlocks.empty())
- {
- std::vector<MetadataDiskEntry> MissingMetadatas;
- for (auto& It : m_BlobLookup)
- {
- const IoHash& BlobHash = It.first;
- const BlobIndex ReadBlobIndex = It.second;
- const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
- if (ReadBlobEntry.Metadata)
- {
- const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata];
- if (MissingBlocks.contains(MetaData.Location.BlockIndex))
- {
- MissingMetadatas.push_back(
- MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash});
- MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
- m_MetadataEntries[ReadBlobEntry.Metadata] = {};
- m_BlobEntries[ReadBlobIndex].Metadata = {};
- }
- }
- }
- ZEN_ASSERT(!MissingMetadatas.empty());
-
- for (const MetadataDiskEntry& Entry : MissingMetadatas)
- {
- auto It = m_BlobLookup.find(Entry.BlobHash);
- ZEN_ASSERT(It != m_BlobLookup.end());
-
- const BlobIndex ReadBlobIndex = It->second;
- const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex];
- if (!ReadBlobEntry.Payload)
- {
- m_BlobLookup.erase(It);
- }
- }
- m_MetadatalogFile.Append(MissingMetadatas);
- CompactState();
- }
-
m_Gc.AddGcReferencer(*this);
m_Gc.AddGcReferenceLocker(*this);
- m_Gc.AddGcStorage(this);
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to initialize build store. Reason: '{}'", Ex.what());
- m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcReferenceLocker(*this);
m_Gc.RemoveGcReferencer(*this);
}
@@ -249,7 +188,6 @@ BuildStore::~BuildStore()
try
{
ZEN_TRACE_CPU("BuildStore::~BuildStore");
- m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcReferenceLocker(*this);
m_Gc.RemoveGcReferencer(*this);
Flush();
@@ -280,21 +218,12 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
}
}
- uint64_t PayloadSize = Payload.GetSize();
- PayloadEntry Entry;
- if (Payload.GetSize() > m_Config.SmallBlobBlockStoreMaxBlockEmbedSize)
- {
- CasStore::InsertResult Result = m_LargeBlobStore.InsertChunk(Payload, BlobHash);
- ZEN_UNUSED(Result);
- Entry = PayloadEntry(PayloadEntry::kStandalone, PayloadSize);
- }
- else
- {
- CasStore::InsertResult Result = m_SmallBlobStore.InsertChunk(Payload, BlobHash);
- ZEN_UNUSED(Result);
- Entry = PayloadEntry(0, PayloadSize);
- }
+ uint64_t PayloadSize = Payload.GetSize();
+ CidStore::InsertResult Result = m_BlobStore.AddChunk(Payload, BlobHash);
+ PayloadEntry Entry = PayloadEntry(0, PayloadSize);
+ ;
+ IoHash MetadataHash;
{
RwLock::ExclusiveLockScope _(m_Lock);
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
@@ -310,6 +239,10 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size()));
m_PayloadEntries.push_back(Entry);
}
+ if (Blob.Metadata)
+ {
+ MetadataHash = m_MetadataEntries[Blob.Metadata].MetadataHash;
+ }
Blob.LastAccessTime = GcClock::TickCount();
}
else
@@ -322,6 +255,16 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload)
m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
m_BlobLookup.insert({BlobHash, NewBlobIndex});
}
+
+ m_LastAccessTimeUpdateCount++;
+ if (m_TrackedBlobKeys)
+ {
+ m_TrackedBlobKeys->push_back(BlobHash);
+ if (MetadataHash != IoHash::Zero)
+ {
+ m_TrackedBlobKeys->push_back(BlobHash);
+ }
+ }
}
m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
m_LastAccessTimeUpdateCount++;
@@ -340,21 +283,9 @@ BuildStore::GetBlob(const IoHash& BlobHash)
Blob.LastAccessTime = GcClock::TickCount();
if (Blob.Payload)
{
- const PayloadEntry& Entry = m_PayloadEntries[Blob.Payload];
- const bool IsStandalone = (Entry.GetFlags() & PayloadEntry::kStandalone) != 0;
Lock.ReleaseNow();
+ IoBuffer Chunk = m_BlobStore.FindChunkByCid(BlobHash);
- IoBuffer Chunk;
- if (IsStandalone)
- {
- ZEN_TRACE_CPU("GetLarge");
- Chunk = m_LargeBlobStore.FindChunk(BlobHash);
- }
- else
- {
- ZEN_TRACE_CPU("GetSmall");
- Chunk = m_SmallBlobStore.FindChunk(BlobHash);
- }
if (Chunk)
{
Chunk.SetContentType(ZenContentType::kCompressedBinary);
@@ -362,7 +293,7 @@ BuildStore::GetBlob(const IoHash& BlobHash)
}
else
{
- ZEN_WARN("Inconsistencies in build store, {} is in index but not {}", BlobHash, IsStandalone ? "on disk" : "in block");
+ ZEN_WARN("Inconsistencies in build store, {} is in index but not in blob store", BlobHash);
}
}
}
@@ -381,10 +312,10 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes)
{
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
- const BlobIndex ExistingBlobIndex = It->second;
- BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
- bool HasPayload = !!Blob.Payload;
- bool HasMetadata = !!Blob.Metadata;
+ const BlobIndex ExistingBlobIndex = It->second;
+ const BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex];
+ bool HasPayload = !!Blob.Payload;
+ bool HasMetadata = !!Blob.Metadata;
Result.push_back(BlobExistsResult{.HasBody = HasPayload, .HasMetadata = HasMetadata});
}
else
@@ -396,20 +327,82 @@ BuildStore::BlobsExists(std::span<const IoHash> BlobHashes)
}
void
-BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas)
+BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> Metadatas, WorkerThreadPool* OptionalWorkerPool)
{
ZEN_TRACE_CPU("BuildStore::PutMetadatas");
ZEN_MEMSCOPE(GetBuildstoreTag());
- size_t WriteBlobIndex = 0;
- m_MetadataBlockStore.WriteChunks(MetaDatas, m_Config.MetadataBlockStoreAlignement, [&](std::span<BlockStoreLocation> Locations) {
+ std::vector<IoHash> MetadataHashes;
+ std::vector<IoBuffer> CompressedMetadataBuffers;
+
+ auto CompressOne = [&BlobHashes, &MetadataHashes, &CompressedMetadataBuffers](const IoBuffer& Buffer, size_t Index) {
+ if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (!CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize))
+ {
+ throw std::runtime_error(
+ fmt::format("Invalid compressed buffer provided when storing metadata for blob {}", BlobHashes[Index]));
+ }
+ else
+ {
+ CompressedMetadataBuffers[Index] = Buffer;
+ MetadataHashes[Index] = RawHash;
+ }
+ }
+ else
+ {
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer(Buffer), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ MetadataHashes[Index] = Compressed.DecodeRawHash();
+ CompressedMetadataBuffers[Index] = std::move(Compressed.GetCompressed()).Flatten().AsIoBuffer();
+ CompressedMetadataBuffers[Index].SetContentType(ZenContentType::kCompressedBinary);
+ }
+ };
+
+ MetadataHashes.resize(Metadatas.size());
+ CompressedMetadataBuffers.resize(Metadatas.size());
+ if (OptionalWorkerPool)
+ {
+ std::atomic<bool> AbortFlag;
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
+ for (size_t Index = 0; Index < Metadatas.size(); Index++)
+ {
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [Index, &BlobHashes, &Metadatas, &MetadataHashes, &CompressedMetadataBuffers, &CompressOne](std::atomic<bool>&) {
+ const IoBuffer& Buffer = Metadatas[Index];
+ CompressOne(Buffer, Index);
+ },
+ {});
+ }
+ Work.Wait();
+ }
+ else
+ {
+ for (size_t Index = 0; Index < Metadatas.size(); Index++)
+ {
+ const IoBuffer& Buffer = Metadatas[Index];
+ CompressOne(Buffer, Index);
+ }
+ }
+
+ std::vector<MetadataDiskEntry> AddedMetadataEntries;
+ AddedMetadataEntries.reserve(MetadataHashes.size());
+
+ std::vector<CidStore::InsertResult> InsertResults = m_BlobStore.AddChunks(CompressedMetadataBuffers, MetadataHashes);
+ ZEN_UNUSED(InsertResults);
+ {
RwLock::ExclusiveLockScope _(m_Lock);
- for (size_t LocationIndex = 0; LocationIndex < Locations.size(); LocationIndex++)
+ for (size_t Index = 0; Index < BlobHashes.size(); Index++)
{
- const IoBuffer& Data = MetaDatas[WriteBlobIndex];
- const IoHash& BlobHash = BlobHashes[WriteBlobIndex];
- const BlockStoreLocation& Location = Locations[LocationIndex];
+ const ZenContentType ContentType = Metadatas[Index].GetContentType();
+ const IoHash& BlobHash = BlobHashes[Index];
+ const IoHash& MetadataHash = MetadataHashes[Index];
+ const IoBuffer& Metadata = CompressedMetadataBuffers[Index];
- MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0};
+ MetadataEntry Entry(MetadataHash, Metadata.GetSize(), ContentType, 0);
if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end())
{
@@ -435,17 +428,16 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB
m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())});
m_BlobLookup.insert({BlobHash, NewBlobIndex});
}
-
- m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
-
m_LastAccessTimeUpdateCount++;
- WriteBlobIndex++;
- if (m_TrackedCacheKeys)
+ if (m_TrackedBlobKeys)
{
- m_TrackedCacheKeys->insert(BlobHash);
+ m_TrackedBlobKeys->push_back(BlobHash);
+ m_TrackedBlobKeys->push_back(MetadataHash);
}
+ AddedMetadataEntries.push_back(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash});
}
- });
+ }
+ m_MetadatalogFile.Append(AddedMetadataEntries);
}
std::vector<IoBuffer>
@@ -453,9 +445,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
{
ZEN_TRACE_CPU("BuildStore::GetMetadatas");
ZEN_MEMSCOPE(GetBuildstoreTag());
- std::vector<BlockStoreLocation> MetaLocations;
- std::vector<size_t> MetaLocationResultIndexes;
- MetaLocations.reserve(BlobHashes.size());
+ std::vector<IoHash> MetadataHashes;
+ std::vector<size_t> MetaLocationResultIndexes;
+ MetadataHashes.reserve(BlobHashes.size());
MetaLocationResultIndexes.reserve(BlobHashes.size());
tsl::robin_set<uint32_t> ReferencedBlocks;
@@ -475,10 +467,9 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
if (ExistingBlobEntry.Metadata)
{
const MetadataEntry& ExistingMetadataEntry = m_MetadataEntries[ExistingBlobEntry.Metadata];
- MetaLocations.push_back(ExistingMetadataEntry.Location);
+ MetadataHashes.push_back(ExistingMetadataEntry.MetadataHash);
MetaLocationResultIndexes.push_back(Index);
- ReferencedBlocks.insert(ExistingMetadataEntry.Location.BlockIndex);
- ResultContentTypes[Index] = ExistingMetadataEntry.ContentType;
+ ResultContentTypes[Index] = ExistingMetadataEntry.GetContentType();
}
ExistingBlobEntry.LastAccessTime = AccessTime(GcClock::TickCount());
m_LastAccessTimeUpdateCount++;
@@ -486,100 +477,35 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
}
}
- auto DoOneBlock = [this](std::span<const BlockStoreLocation> MetaLocations,
- std::span<const size_t> MetaLocationResultIndexes,
- std::span<const size_t> ChunkIndexes,
- std::vector<IoBuffer>& Result) {
- if (ChunkIndexes.size() < 4)
- {
- for (size_t ChunkIndex : ChunkIndexes)
+ m_BlobStore.IterateChunks(
+ MetadataHashes,
+ [this, &BlobHashes, &MetadataHashes, &MetaLocationResultIndexes, &Result](size_t Index, const IoBuffer& Payload) {
+ if (Payload)
{
- IoBuffer Chunk = m_MetadataBlockStore.TryGetChunk(MetaLocations[ChunkIndex]);
- if (Chunk)
- {
- size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
- Result[ResultIndex] = std::move(Chunk);
- }
- }
- return true;
- }
- return m_MetadataBlockStore.IterateBlock(
- MetaLocations,
- ChunkIndexes,
- [&MetaLocationResultIndexes, &Result](size_t ChunkIndex, const void* Data, uint64_t Size) {
- if (Data != nullptr)
+ size_t ResultIndex = MetaLocationResultIndexes[Index];
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer CompressedBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize);
+ if (CompressedBuffer)
{
- size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
- Result[ResultIndex] = IoBuffer(IoBuffer::Clone, Data, Size);
- }
- return true;
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- size_t ResultIndex = MetaLocationResultIndexes[ChunkIndex];
- Result[ResultIndex] = File.GetChunk(Offset, Size);
- return true;
- },
- 8u * 1024u);
- };
-
- if (!MetaLocations.empty())
- {
- std::atomic<bool> AbortFlag;
- std::atomic<bool> PauseFlag;
- ParallelWork Work(AbortFlag, PauseFlag);
-
- try
- {
- m_MetadataBlockStore.IterateChunks(
- MetaLocations,
- [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
- uint32_t BlockIndex,
- std::span<const size_t> ChunkIndexes) -> bool {
- ZEN_UNUSED(BlockIndex);
- if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
+ IoBuffer Decompressed = CompressedBuffer.DecompressToComposite().Flatten().AsIoBuffer();
+ if (Decompressed)
{
- return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ Result[ResultIndex] = std::move(Decompressed);
}
else
{
- ZEN_ASSERT(OptionalWorkerPool != nullptr);
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this,
- &Result,
- &MetaLocations,
- &MetaLocationResultIndexes,
- DoOneBlock,
- ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- try
- {
- if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
- {
- AbortFlag.store(true);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
- }
- });
- return !Work.IsAborted();
+ ZEN_WARN("Metadata {} for blob {} is malformed (not a compressed binary format)",
+ MetadataHashes[ResultIndex],
+ BlobHashes[ResultIndex]);
}
- });
- }
- catch (const std::exception& Ex)
- {
- AbortFlag.store(true);
- ZEN_WARN("Failed iterating block metadata chunks in {}. Reason: '{}'", m_Config.RootDirectory, Ex.what());
- }
+ }
+ }
+ return true;
+ },
+ OptionalWorkerPool,
+ 8u * 1024u);
- Work.Wait();
- }
for (size_t Index = 0; Index < Result.size(); Index++)
{
if (Result[Index])
@@ -600,9 +526,7 @@ BuildStore::Flush()
const auto _ = MakeGuard(
[&] { ZEN_INFO("Flushed build store at {} in {}", m_Config.RootDirectory, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); });
- m_LargeBlobStore.Flush();
- m_SmallBlobStore.Flush();
- m_MetadataBlockStore.Flush(false);
+ m_BlobStore.Flush();
m_PayloadlogFile.Flush();
m_MetadatalogFile.Flush();
@@ -636,22 +560,14 @@ BuildStore::GetStorageStats() const
{
const PayloadEntry& Payload = m_PayloadEntries[ReadBlobEntry.Payload];
uint64_t Size = Payload.GetSize();
- if ((Payload.GetFlags() & PayloadEntry::kStandalone) != 0)
- {
- Result.LargeBlobCount++;
- Result.LargeBlobBytes += Size;
- }
- else
- {
- Result.SmallBlobCount++;
- Result.SmallBlobBytes += Size;
- }
+ Result.BlobCount++;
+ Result.BlobBytes += Size;
}
if (ReadBlobEntry.Metadata)
{
const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata];
Result.MetadataCount++;
- Result.MetadataByteCount += Metadata.Location.Size;
+ Result.MetadataByteCount += Metadata.GetSize();
}
}
}
@@ -882,10 +798,9 @@ BuildStore::ReadMetadataLog(const RwLock::ExclusiveLockScope&, const std::filesy
CasLog.Replay(
[&](const MetadataDiskEntry& Record) {
std::string InvalidEntryReason;
- if (Record.Entry.Flags & MetadataEntry::kTombStone)
+ if (Record.Entry.GetFlags() & MetadataEntry::kTombStone)
{
// Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
- // Note: this leaves m_BlobLookup and other arrays with 'holes' in them, this will get clean up in compact gc operation
if (auto ExistingIt = m_BlobLookup.find(Record.BlobHash); ExistingIt != m_BlobLookup.end())
{
if (!m_BlobEntries[ExistingIt->second].Payload)
@@ -1058,7 +973,7 @@ BuildStore::ValidatePayloadDiskEntry(const PayloadDiskEntry& Entry, std::string&
OutReason = fmt::format("Invalid blob hash {}", Entry.BlobHash.ToHexString());
return false;
}
- if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone | PayloadEntry::kStandalone))
+ if (Entry.Entry.GetFlags() & ~(PayloadEntry::kTombStone))
{
OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Entry.GetFlags(), Entry.BlobHash.ToHexString());
return false;
@@ -1083,30 +998,20 @@ BuildStore::ValidateMetadataDiskEntry(const MetadataDiskEntry& Entry, std::strin
OutReason = fmt::format("Invalid blob hash {} for meta entry", Entry.BlobHash.ToHexString());
return false;
}
- if (Entry.Entry.Location.Size == 0)
+ if (Entry.Entry.GetSize() == 0)
{
- OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.Location.Size);
+ OutReason = fmt::format("Invalid meta blob size {} for meta entry", Entry.Entry.GetSize());
return false;
}
- if (Entry.Entry.Reserved1 != 0 || Entry.Entry.Reserved2 != 0)
- {
- OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
- return false;
- }
- if (Entry.Entry.Flags & MetadataEntry::kTombStone)
+ if (Entry.Entry.GetFlags() & MetadataEntry::kTombStone)
{
return true;
}
- if (Entry.Entry.ContentType == ZenContentType::kCOUNT)
+ if (Entry.Entry.GetContentType() == ZenContentType::kCOUNT)
{
OutReason = fmt::format("Invalid content type for meta entry {}", Entry.BlobHash.ToHexString());
return false;
}
- if (Entry.Reserved1 != 0 || Entry.Reserved2 != 0 || Entry.Reserved3 != 0 || Entry.Reserved4 != 0)
- {
- OutReason = fmt::format("Invalid reserved fields for meta entry {}", Entry.BlobHash.ToHexString());
- return false;
- }
return true;
}
@@ -1114,31 +1019,76 @@ class BuildStoreGcReferenceChecker : public GcReferenceChecker
{
public:
BuildStoreGcReferenceChecker(BuildStore& Store) : m_Store(Store) {}
+ ~BuildStoreGcReferenceChecker()
+ {
+ try
+ {
+ m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys.reset(); });
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~BuildStoreGcReferenceChecker threw exception: '{}'", Ex.what());
+ }
+ }
virtual std::string GetGcName(GcCtx& Ctx) override
{
ZEN_UNUSED(Ctx);
return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
}
- virtual void PreCache(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); }
-
- virtual void UpdateLockedState(GcCtx& Ctx) override
+ virtual void PreCache(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Builds::UpdateLockedState");
- ZEN_MEMSCOPE(GetBuildstoreTag());
+ ZEN_TRACE_CPU("Builds::PreCache");
auto Log = [&Ctx]() { return Ctx.Logger; };
- m_References.reserve(m_Store.m_BlobLookup.size());
- for (const auto& It : m_Store.m_BlobLookup)
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: builds [PRECACHE] '{}': found {} references in {}",
+ m_Store.m_Config.RootDirectory,
+ m_PrecachedReferences.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedBlobKeys = std::make_unique<std::vector<IoHash>>(); });
+
{
- const BuildStore::BlobIndex ExistingBlobIndex = It.second;
- if (m_Store.m_BlobEntries[ExistingBlobIndex].Payload)
+ m_PrecachedReferences.reserve(m_Store.m_BlobLookup.size());
+ RwLock::SharedLockScope __(m_Store.m_Lock);
+ for (const auto& It : m_Store.m_BlobLookup)
{
- m_References.push_back(It.first);
+ const BuildStore::BlobIndex ExistingBlobIndex = It.second;
+ const BuildStore::BlobEntry& Entry = m_Store.m_BlobEntries[ExistingBlobIndex];
+ if (Entry.Payload)
+ {
+ m_PrecachedReferences.push_back(It.first);
+ }
+ if (Entry.Metadata)
+ {
+ const BuildStore::MetadataEntry& MetadataEntry = m_Store.m_MetadataEntries[Entry.Metadata];
+ m_PrecachedReferences.push_back(MetadataEntry.MetadataHash);
+ }
}
}
- FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", "buildstore"), m_References);
+ FilterReferences(Ctx, fmt::format("buildstore [PRECACHE] '{}'", m_Store.m_Config.RootDirectory), m_PrecachedReferences);
+ }
+
+ virtual void UpdateLockedState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Builds::UpdateLockedState");
+ ZEN_MEMSCOPE(GetBuildstoreTag());
+
+ auto Log = [&Ctx]() { return Ctx.Logger; };
+
+ ZEN_ASSERT(m_Store.m_TrackedBlobKeys);
+
+ m_AddedReferences = std::move(*m_Store.m_TrackedBlobKeys);
+
+ FilterReferences(Ctx, fmt::format("buildstore [LOCKSTATE] '{}'", m_Store.m_Config.RootDirectory), m_AddedReferences);
}
virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override
@@ -1165,14 +1115,16 @@ public:
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids);
+ std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_PrecachedReferences, IoCids);
+ UnusedReferences = KeepUnusedReferences(m_AddedReferences, UnusedReferences);
UsedCount = IoCids.size() - UnusedReferences.size();
return UnusedReferences;
}
private:
BuildStore& m_Store;
- std::vector<IoHash> m_References;
+ std::vector<IoHash> m_PrecachedReferences;
+ std::vector<IoHash> m_AddedReferences;
};
std::string
@@ -1184,201 +1136,6 @@ BuildStore::GetGcName(GcCtx& Ctx)
return fmt::format("buildstore: '{}'", m_Config.RootDirectory.string());
}
-class BuildStoreGcCompator : public GcStoreCompactor
-{
- using BlobEntry = BuildStore::BlobEntry;
- using PayloadEntry = BuildStore::PayloadEntry;
- using MetadataEntry = BuildStore::MetadataEntry;
- using MetadataDiskEntry = BuildStore::MetadataDiskEntry;
- using BlobIndex = BuildStore::BlobIndex;
- using PayloadIndex = BuildStore::PayloadIndex;
- using MetadataIndex = BuildStore::MetadataIndex;
-
-public:
- BuildStoreGcCompator(BuildStore& Store, std::vector<IoHash>&& RemovedBlobs) : m_Store(Store), m_RemovedBlobs(std::move(RemovedBlobs)) {}
-
- virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) override
- {
- ZEN_UNUSED(ClaimDiskReserveCallback);
- ZEN_TRACE_CPU("Builds::CompactStore");
- ZEN_MEMSCOPE(GetBuildstoreTag());
-
- auto Log = [&Ctx]() { return Ctx.Logger; };
-
- Stopwatch Timer;
- const auto _ = MakeGuard([&] {
- if (!Ctx.Settings.Verbose)
- {
- return;
- }
- ZEN_INFO("GCV2: buildstore [COMPACT] '{}': RemovedDisk: {} in {}",
- m_Store.m_Config.RootDirectory,
- NiceBytes(Stats.RemovedDisk),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
-
- const auto __ = MakeGuard([&] { m_Store.Flush(); });
-
- if (!m_RemovedBlobs.empty())
- {
- if (Ctx.Settings.CollectSmallObjects)
- {
- m_Store.m_Lock.WithExclusiveLock([this]() { m_Store.m_TrackedCacheKeys = std::make_unique<HashSet>(); });
- auto __ = MakeGuard([this]() { m_Store.m_Lock.WithExclusiveLock([&]() { m_Store.m_TrackedCacheKeys.reset(); }); });
-
- BlockStore::BlockUsageMap BlockUsage;
- {
- RwLock::SharedLockScope __(m_Store.m_Lock);
-
- for (auto LookupIt : m_Store.m_BlobLookup)
- {
- const BlobIndex ReadBlobIndex = LookupIt.second;
- const BlobEntry& ReadBlobEntry = m_Store.m_BlobEntries[ReadBlobIndex];
-
- if (ReadBlobEntry.Metadata)
- {
- const MetadataEntry& ReadMetadataEntry = m_Store.m_MetadataEntries[ReadBlobEntry.Metadata];
-
- uint32_t BlockIndex = ReadMetadataEntry.Location.BlockIndex;
- uint64_t ChunkSize = RoundUp(ReadMetadataEntry.Location.Size, m_Store.m_Config.MetadataBlockStoreAlignement);
-
- if (auto BlockUsageIt = BlockUsage.find(BlockIndex); BlockUsageIt != BlockUsage.end())
- {
- BlockStore::BlockUsageInfo& Info = BlockUsageIt.value();
- Info.EntryCount++;
- Info.DiskUsage += ChunkSize;
- }
- else
- {
- BlockUsage.insert_or_assign(BlockIndex,
- BlockStore::BlockUsageInfo{.DiskUsage = ChunkSize, .EntryCount = 1});
- }
- }
- }
- }
-
- BlockStore::BlockEntryCountMap BlocksToCompact = m_Store.m_MetadataBlockStore.GetBlocksToCompact(BlockUsage, 90);
- BlockStoreCompactState BlockCompactState;
- std::vector<IoHash> BlockCompactStateKeys;
- BlockCompactState.IncludeBlocks(BlocksToCompact);
-
- if (BlocksToCompact.size() > 0)
- {
- {
- RwLock::SharedLockScope ___(m_Store.m_Lock);
- for (const auto& Entry : m_Store.m_BlobLookup)
- {
- BlobIndex Index = Entry.second;
-
- if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
- {
- if (BlockCompactState.AddKeepLocation(m_Store.m_MetadataEntries[Meta].Location))
- {
- BlockCompactStateKeys.push_back(Entry.first);
- }
- }
- }
- }
-
- if (Ctx.Settings.IsDeleteMode)
- {
- if (Ctx.Settings.Verbose)
- {
- ZEN_INFO("GCV2: buildstore [COMPACT] '{}': compacting {} blocks",
- m_Store.m_Config.RootDirectory,
- BlocksToCompact.size());
- }
-
- m_Store.m_MetadataBlockStore.CompactBlocks(
- BlockCompactState,
- m_Store.m_Config.MetadataBlockStoreAlignement,
- [&](const BlockStore::MovedChunksArray& MovedArray,
- const BlockStore::ChunkIndexArray& ScrubbedArray,
- uint64_t FreedDiskSpace) {
- std::vector<MetadataDiskEntry> MovedEntries;
- MovedEntries.reserve(MovedArray.size());
- RwLock::ExclusiveLockScope _(m_Store.m_Lock);
- for (const std::pair<size_t, BlockStoreLocation>& Moved : MovedArray)
- {
- size_t ChunkIndex = Moved.first;
- const IoHash& Key = BlockCompactStateKeys[ChunkIndex];
-
- ZEN_ASSERT(m_Store.m_TrackedCacheKeys);
- if (m_Store.m_TrackedCacheKeys->contains(Key))
- {
- continue;
- }
-
- if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end())
- {
- const BlobIndex Index = It->second;
-
- if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
- {
- m_Store.m_MetadataEntries[Meta].Location = Moved.second;
- MovedEntries.push_back(
- MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key});
- }
- }
- }
-
- for (size_t Scrubbed : ScrubbedArray)
- {
- const IoHash& Key = BlockCompactStateKeys[Scrubbed];
- if (auto It = m_Store.m_BlobLookup.find(Key); It != m_Store.m_BlobLookup.end())
- {
- const BlobIndex Index = It->second;
-
- if (MetadataIndex Meta = m_Store.m_BlobEntries[Index].Metadata; Meta)
- {
- MovedEntries.push_back(
- MetadataDiskEntry{.Entry = m_Store.m_MetadataEntries[Meta], .BlobHash = Key});
- MovedEntries.back().Entry.Flags |= MetadataEntry::kTombStone;
- m_Store.m_MetadataEntries[Meta] = {};
- m_Store.m_BlobEntries[Index].Metadata = {};
- }
- }
- }
-
- m_Store.m_MetadatalogFile.Append(MovedEntries);
-
- Stats.RemovedDisk += FreedDiskSpace;
- if (Ctx.IsCancelledFlag.load())
- {
- return false;
- }
- return true;
- },
- ClaimDiskReserveCallback,
- fmt::format("GCV2: buildstore [COMPACT] '{}': ", m_Store.m_Config.RootDirectory));
- }
- else
- {
- if (Ctx.Settings.Verbose)
- {
- ZEN_INFO("GCV2: buildstore [COMPACT] '{}': skipped compacting of {} eligible blocks",
- m_Store.m_Config.RootDirectory,
- BlocksToCompact.size());
- }
- }
- }
- }
- }
- }
-
- virtual std::string GetGcName(GcCtx& Ctx) override
- {
- ZEN_UNUSED(Ctx);
- ZEN_MEMSCOPE(GetBuildstoreTag());
-
- return fmt::format("buildstore: '{}'", m_Store.m_Config.RootDirectory.string());
- }
-
-private:
- BuildStore& m_Store;
- const std::vector<IoHash> m_RemovedBlobs;
-};
-
GcStoreCompactor*
BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
@@ -1413,10 +1170,9 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
uint64_t BlobSize = 0;
};
- bool DiskSizeExceeded = false;
- const uint64_t CurrentDiskSize =
- m_LargeBlobStore.StorageSize().DiskSize + m_SmallBlobStore.StorageSize().DiskSize + m_MetadataBlockStore.TotalSize();
- if (CurrentDiskSize > m_Config.MaxDiskSpaceLimit)
+ bool DiskSizeExceeded = false;
+ const uint64_t CurrentBlobsDiskSize = m_BlobStore.TotalSize().TotalSize;
+ if ((m_Config.MaxDiskSpaceLimit > 0) && (CurrentBlobsDiskSize > m_Config.MaxDiskSpaceLimit))
{
DiskSizeExceeded = true;
}
@@ -1444,14 +1200,14 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
if (ReadBlobEntry.Metadata)
{
const MetadataEntry& Metadata = m_MetadataEntries[ReadBlobEntry.Metadata];
- Size += Metadata.Location.Size;
+ Size += Metadata.GetSize();
}
const GcClock::Tick AccessTick = ReadBlobEntry.LastAccessTime;
if (AccessTick < ExpireTicks)
{
ExpiredBlobs.push_back(It.first);
- ExpiredDataSize += ExpiredDataSize;
+ ExpiredDataSize += Size;
}
else if (DiskSizeExceeded)
{
@@ -1469,7 +1225,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
const uint64_t NewSizeLimit =
m_Config.MaxDiskSpaceLimit -
(m_Config.MaxDiskSpaceLimit >> 4); // Remove a bit more than just below the limit so we have some space to grow
- if ((CurrentDiskSize - ExpiredDataSize) > NewSizeLimit)
+ if ((CurrentBlobsDiskSize - ExpiredDataSize) > NewSizeLimit)
{
std::vector<size_t> NonExpiredOrder;
NonExpiredOrder.resize(NonExpiredBlobSizeInfos.size());
@@ -1487,7 +1243,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
while (It != NonExpiredOrder.end())
{
const SizeInfo& Info = NonExpiredBlobSizeInfos[*It];
- if ((CurrentDiskSize - ExpiredDataSize) < NewSizeLimit)
+ if ((CurrentBlobsDiskSize - ExpiredDataSize) < NewSizeLimit)
{
break;
}
@@ -1539,7 +1295,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
{
RemoveMetadatas.push_back(
MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = ExpiredBlob});
- RemoveMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone;
+ RemoveMetadatas.back().Entry.AddFlag(MetadataEntry::kTombStone);
m_MetadataEntries[ReadBlobEntry.Metadata] = {};
m_BlobEntries[ReadBlobIndex].Metadata = {};
}
@@ -1568,7 +1324,7 @@ BuildStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
CompactState();
}
- return new BuildStoreGcCompator(*this, std::move(RemovedBlobs));
+ return nullptr;
}
std::vector<GcReferenceChecker*>
@@ -1595,21 +1351,6 @@ BuildStore::LockState(GcCtx& Ctx)
return Locks;
}
-void
-BuildStore::ScrubStorage(ScrubContext& ScrubCtx)
-{
- ZEN_UNUSED(ScrubCtx);
- // TODO
-}
-
-GcStorageSize
-BuildStore::StorageSize() const
-{
- GcStorageSize Result;
- Result.DiskSize = m_MetadataBlockStore.TotalSize();
- return Result;
-}
-
/*
___________ __
\__ ___/___ _______/ |_ ______
@@ -1630,8 +1371,10 @@ TEST_CASE("BuildStore.Blobs")
std::vector<IoHash> CompressedBlobsHashes;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
@@ -1658,10 +1401,13 @@ TEST_CASE("BuildStore.Blobs")
IoBuffer Decompressed = CompressedBlob.Decompress().AsIoBuffer();
CHECK(IoHash::HashBuffer(Decompressed) == RawHash);
}
+ BlobStore.Flush();
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (const IoHash& RawHash : CompressedBlobsHashes)
{
IoBuffer Payload = Store.GetBlob(RawHash);
@@ -1689,8 +1435,10 @@ TEST_CASE("BuildStore.Blobs")
}
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (const IoHash& RawHash : CompressedBlobsHashes)
{
IoBuffer Payload = Store.GetBlob(RawHash);
@@ -1709,7 +1457,7 @@ TEST_CASE("BuildStore.Blobs")
}
namespace blockstore::testing {
- IoBuffer MakeMetaData(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues)
+ IoBuffer MakeMetadata(const IoHash& BlobHash, const std::vector<std::pair<std::string, std::string>>& KeyValues)
{
CbObjectWriter Writer;
Writer.AddHash("rawHash"sv, BlobHash);
@@ -1740,16 +1488,18 @@ TEST_CASE("BuildStore.Metadata")
std::vector<IoHash> BlobHashes;
std::vector<IoBuffer> MetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
BlobHashes.push_back(IoHash::HashBuffer(&I, sizeof(I)));
- MetaPayloads.push_back(MakeMetaData(BlobHashes.back(), {{"index", fmt::format("{}", I)}}));
+ MetaPayloads.push_back(MakeMetadata(BlobHashes.back(), {{"index", fmt::format("{}", I)}}));
MetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(BlobHashes, MetaPayloads);
+ Store.PutMetadatas(BlobHashes, MetaPayloads, &WorkerPool);
std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
@@ -1760,8 +1510,10 @@ TEST_CASE("BuildStore.Metadata")
}
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool);
CHECK(ValidateMetaPayloads.size() == MetaPayloads.size());
for (size_t I = 0; I < ValidateMetaPayloads.size(); I++)
@@ -1776,8 +1528,10 @@ TEST_CASE("BuildStore.Metadata")
}
std::vector<IoHash> CompressedBlobsHashes;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
@@ -1805,14 +1559,16 @@ TEST_CASE("BuildStore.Metadata")
std::vector<IoBuffer> BlobMetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
- BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
@@ -1824,8 +1580,10 @@ TEST_CASE("BuildStore.Metadata")
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
@@ -1847,14 +1605,16 @@ TEST_CASE("BuildStore.Metadata")
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
BlobMetaPayloads.push_back(
- MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}}));
+ MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}, {"replaced", fmt::format("{}", true)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, &WorkerPool);
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
@@ -1886,8 +1646,10 @@ TEST_CASE("BuildStore.GC")
std::vector<IoHash> CompressedBlobsHashes;
std::vector<IoBuffer> BlobMetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 5; I++)
{
IoBuffer Blob = CreateSemiRandomBlob(4711 + I * 7);
@@ -1900,14 +1662,16 @@ TEST_CASE("BuildStore.GC")
}
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
- BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr);
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
{
GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
@@ -1967,8 +1731,10 @@ TEST_CASE("BuildStore.SizeLimit")
std::vector<IoHash> CompressedBlobsHashes;
std::vector<IoBuffer> BlobMetaPayloads;
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
for (size_t I = 0; I < 64; I++)
{
IoBuffer Blob = CreateSemiRandomBlob(65537 + I * 7);
@@ -1981,10 +1747,10 @@ TEST_CASE("BuildStore.SizeLimit")
}
for (const IoHash& BlobHash : CompressedBlobsHashes)
{
- BlobMetaPayloads.push_back(MakeMetaData(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
+ BlobMetaPayloads.push_back(MakeMetadata(BlobHash, {{"blobHash", fmt::format("{}", BlobHash)}}));
BlobMetaPayloads.back().SetContentType(ZenContentType::kCbObject);
}
- Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads);
+ Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads, nullptr);
{
for (size_t I = 0; I < 64; I++)
@@ -1997,8 +1763,10 @@ TEST_CASE("BuildStore.SizeLimit")
}
}
{
- GcManager Gc;
- BuildStore Store(Config, Gc);
+ GcManager Gc;
+ CidStore BlobStore(Gc);
+ BlobStore.Initialize({.RootDirectory = _.Path() / "build_cas"});
+ BuildStore Store(Config, Gc, BlobStore);
{
GcResult Result = Gc.CollectGarbage(GcSettings{.BuildStoreExpireTime = GcClock::Now() - std::chrono::hours(1),
@@ -2023,7 +1791,7 @@ TEST_CASE("BuildStore.SizeLimit")
CHECK(IoHash::HashBuffer(DecompressedBlob) == BlobHash);
}
}
- CHECK(DeletedBlobs == 50);
+ CHECK(DeletedBlobs == 53);
std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr);
CHECK(MetadataPayloads.size() == BlobMetaPayloads.size());
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 502ab1508..e0030230f 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1408,7 +1408,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
for (size_t Index = 0; Index < Batch->Entries.size(); Index++)
{
const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
- ZEN_ASSERT(HashKeyAndReferences.size() > 0);
+ ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
ZenCacheValue TemporaryValue;
TemporaryValue.Value = Batch->Buffers[Index];
@@ -1444,7 +1444,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
const std::vector<IoHash>& HashKeyAndReferences =
Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences;
- ZEN_ASSERT(HashKeyAndReferences.size() > 0);
+ ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
const IoHash HashKey = HashKeyAndReferences[0];
DiskEntries.push_back({.Key = HashKey, .Location = Location});
if (m_TrackedCacheKeys)
diff --git a/src/zenstore/include/zenstore/buildstore/buildstore.h b/src/zenstore/include/zenstore/buildstore/buildstore.h
index adf48dc26..87b7dd812 100644
--- a/src/zenstore/include/zenstore/buildstore/buildstore.h
+++ b/src/zenstore/include/zenstore/buildstore/buildstore.h
@@ -6,9 +6,8 @@
#include <zencore/iohash.h>
#include <zenstore/accesstime.h>
#include <zenstore/caslog.h>
+#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
-#include "../compactcas.h"
-#include "../filecas.h"
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
@@ -19,18 +18,13 @@ namespace zen {
struct BuildStoreConfig
{
std::filesystem::path RootDirectory;
- uint32_t SmallBlobBlockStoreMaxBlockSize = 256 * 1024 * 1024;
- uint64_t SmallBlobBlockStoreMaxBlockEmbedSize = 1 * 1024 * 1024;
- uint32_t SmallBlobBlockStoreAlignement = 16;
- uint32_t MetadataBlockStoreMaxBlockSize = 64 * 1024 * 1024;
- uint32_t MetadataBlockStoreAlignement = 8;
- uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB
+ uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB
};
-class BuildStore : public GcReferencer, public GcReferenceLocker, public GcStorage
+class BuildStore : public GcReferencer, public GcReferenceLocker
{
public:
- explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc);
+ explicit BuildStore(const BuildStoreConfig& Config, GcManager& Gc, CidStore& BlobStore);
virtual ~BuildStore();
void PutBlob(const IoHash& BlobHashes, const IoBuffer& Payload);
@@ -44,7 +38,7 @@ public:
std::vector<BlobExistsResult> BlobsExists(std::span<const IoHash> BlobHashes);
- void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas);
+ void PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoBuffer> MetaDatas, WorkerThreadPool* OptionalWorkerPool);
std::vector<IoBuffer> GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* OptionalWorkerPool);
void Flush();
@@ -52,10 +46,8 @@ public:
struct StorageStats
{
uint64_t EntryCount = 0;
- uint64_t LargeBlobCount = 0;
- uint64_t LargeBlobBytes = 0;
- uint64_t SmallBlobCount = 0;
- uint64_t SmallBlobBytes = 0;
+ uint64_t BlobCount = 0;
+ uint64_t BlobBytes = 0;
uint64_t MetadataCount = 0;
uint64_t MetadataByteCount = 0;
};
@@ -86,23 +78,18 @@ private:
//////// GcReferenceLocker
virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override;
- //////// GcStorage
- virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
- virtual GcStorageSize StorageSize() const override;
-
#pragma pack(push)
#pragma pack(1)
struct PayloadEntry
{
PayloadEntry() {}
- PayloadEntry(uint64_t Flags, uint64_t Size)
+ PayloadEntry(uint8_t Flags, uint64_t Size)
{
ZEN_ASSERT((Size & 0x00ffffffffffffffu) == Size);
- ZEN_ASSERT((Flags & (kTombStone | kStandalone)) == Flags);
+ ZEN_ASSERT((Flags & (kTombStone)) == Flags);
FlagsAndSize = (Size << 8) | Flags;
}
- static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
- static const uint8_t kStandalone = 0x20u; // This payload is stored as a standalone value
+ static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
uint64_t FlagsAndSize = 0;
uint64_t GetSize() const { return FlagsAndSize >> 8; }
@@ -126,27 +113,47 @@ private:
struct MetadataEntry
{
- BlockStoreLocation Location; // 12 bytes
+ IoHash MetadataHash; // 20 bytes
+
+ MetadataEntry() {}
- ZenContentType ContentType = ZenContentType::kCOUNT; // 1 byte
- static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
- uint8_t Flags = 0; // 1 byte
+ MetadataEntry(const IoHash& Hash, uint64_t Size, ZenContentType ContentType, uint8_t Flags)
+ {
+ ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size);
+ ZEN_ASSERT((Flags & kTombStone) == Flags);
+ FlagsContentTypeAndSize = (Size << 16) | ((uint64_t)ContentType << 8) | Flags;
+ MetadataHash = Hash;
+ }
+
+ static const uint8_t kTombStone = 0x10u; // Represents a deleted key/value
+
+ uint64_t GetSize() const { return FlagsContentTypeAndSize >> 16; }
+ void SetSize(uint64_t Size)
+ {
+ ZEN_ASSERT((Size & 0x0000ffffffffffffu) == Size);
+ FlagsContentTypeAndSize = (Size << 16) | (FlagsContentTypeAndSize & 0xffff);
+ }
+
+ uint8_t GetFlags() const { return uint8_t(FlagsContentTypeAndSize & 0xff); }
+ void AddFlag(uint8_t Flag) { FlagsContentTypeAndSize |= Flag; }
+ void SetFlags(uint8_t Flags) { FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffffff00u) | Flags; }
+
+ ZenContentType GetContentType() const { return ZenContentType((FlagsContentTypeAndSize >> 8) & 0xff); }
+ void SetContentType(ZenContentType ContentType)
+ {
+ FlagsContentTypeAndSize = (FlagsContentTypeAndSize & 0xffffffffffff00ffu) | (uint16_t(ContentType) << 8);
+ }
- uint8_t Reserved1 = 0;
- uint8_t Reserved2 = 0;
+ uint64_t FlagsContentTypeAndSize = ((uint64_t)ZenContentType::kCOUNT << 8);
};
- static_assert(sizeof(MetadataEntry) == 16);
+ static_assert(sizeof(MetadataEntry) == 28);
struct MetadataDiskEntry
{
- MetadataEntry Entry; // 16 bytes
+ MetadataEntry Entry; // 28 bytes
IoHash BlobHash; // 20 bytes
- uint8_t Reserved1 = 0;
- uint8_t Reserved2 = 0;
- uint8_t Reserved3 = 0;
- uint8_t Reserved4 = 0;
};
- static_assert(sizeof(MetadataDiskEntry) == 40);
+ static_assert(sizeof(MetadataDiskEntry) == 48);
#pragma pack(pop)
@@ -206,17 +213,15 @@ private:
std::vector<BlobEntry> m_BlobEntries;
tsl::robin_map<IoHash, BlobIndex, IoHash::Hasher> m_BlobLookup;
- FileCasStrategy m_LargeBlobStore;
- CasContainerStrategy m_SmallBlobStore;
- BlockStore m_MetadataBlockStore;
+ CidStore& m_BlobStore;
TCasLogFile<PayloadDiskEntry> m_PayloadlogFile;
TCasLogFile<MetadataDiskEntry> m_MetadatalogFile;
uint64_t m_BlobLogFlushPosition = 0;
uint64_t m_MetaLogFlushPosition = 0;
- std::unique_ptr<HashSet> m_TrackedCacheKeys;
- std::atomic<uint64_t> m_LastAccessTimeUpdateCount;
+ std::unique_ptr<std::vector<IoHash>> m_TrackedBlobKeys;
+ std::atomic<uint64_t> m_LastAccessTimeUpdateCount;
friend class BuildStoreGcReferenceChecker;
friend class BuildStoreGcReferencePruner;
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index 1edca5050..a571d1d11 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -35,17 +35,19 @@ ParallelWork::~ParallelWork()
m_PendingWork.CountDown();
m_DispatchComplete = true;
}
- m_PendingWork.Wait();
- ptrdiff_t RemainingWork = m_PendingWork.Remaining();
+ const bool WaitSucceeded = m_PendingWork.Wait();
+ const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
+ if (!WaitSucceeded)
+ {
+ ZEN_ERROR("ParallelWork::~ParallelWork(): waiting for latch failed, pending work count at {}", RemainingWork);
+ }
if (RemainingWork != 0)
{
void* Frames[8];
uint32_t FrameCount = GetCallstack(2, 8, Frames);
CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
- ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}",
- RemainingWork,
- CallstackToString(Callstack, " "));
- FreeCallstack(Callstack);
+ auto _ = MakeGuard([Callstack]() { FreeCallstack(Callstack); });
+ ZEN_WARN("ParallelWork::~ParallelWork(): waited for outstanding work but pending work count is {} instead of 0", RemainingWork);
uint32_t WaitedMs = 0;
while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000)
@@ -53,20 +55,27 @@ ParallelWork::~ParallelWork()
Sleep(50);
WaitedMs += 50;
}
- RemainingWork = m_PendingWork.Remaining();
- if (RemainingWork != 0)
+ ptrdiff_t RemainingWorkAfterSafetyWait = m_PendingWork.Remaining();
+ if (RemainingWorkAfterSafetyWait != 0)
{
- ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork)
+ ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks failed, pending work count at {} after {}\n{}",
+ RemainingWork,
+ RemainingWorkAfterSafetyWait,
+ NiceLatencyNs(WaitedMs * 1000000u),
+ CallstackToString(Callstack, " "))
}
else
{
- ZEN_INFO("ParallelWork destructor safety wait succeeded");
+ ZEN_ERROR("ParallelWork::~ParallelWork(): safety wait for {} tasks completed after {}\n{}",
+ RemainingWork,
+ NiceLatencyNs(WaitedMs * 1000000u),
+ CallstackToString(Callstack, " "));
}
}
}
catch (const std::exception& Ex)
{
- ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what());
+ ZEN_ERROR("Exception in ParallelWork::~ParallelWork(): {}", Ex.what());
}
}
@@ -103,7 +112,16 @@ ParallelWork::Wait()
m_PendingWork.CountDown();
m_DispatchComplete = true;
- m_PendingWork.Wait();
+ const bool WaitSucceeded = m_PendingWork.Wait();
+ const ptrdiff_t RemainingWork = m_PendingWork.Remaining();
+ if (!WaitSucceeded)
+ {
+ ZEN_ERROR("ParallelWork::Wait(): waiting for latch failed, pending work count at {}", RemainingWork);
+ }
+ else if (RemainingWork != 0)
+ {
+ ZEN_ERROR("ParallelWork::Wait(): pending work count at {} after successful wait for latch", RemainingWork);
+ }
RethrowErrors();
}