aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-06-17 07:06:21 -0700
committerGitHub <[email protected]>2022-06-17 07:06:21 -0700
commitc7e22a4ef1cce7103b9afbeec487461cb32f8dbe (patch)
tree8b99d51bf496c96f82161c18fbdcfd5c6f8f31fd /zenserver
parentfixed merge mistake which caused a build error (diff)
downloadzen-0.1.4-pre6.tar.xz
zen-0.1.4-pre6.zip
Make cas storage an hidden implementation detail of CidStore (#130)v0.1.4-pre6v0.1.4-pre5
- Bumped ZEN_SCHEMA_VERSION - CasStore no longer a public API, it is hidden behind CidStore - Moved cas.h from public header folder - CidStore no longer maps from Cid -> Cas, we store entries in Cas under RawHash - CasStore now decompresses data to validate content (matching against RawHash) - CasChunkSet renames to HashKeySet and put in separate header/cpp file - Disabled "Chunk" command for now as it relied on CAS being exposed as a service - Changed CAS http service to Cid http server - Moved "Run" command completely inside ZEN_WITH_EXEC_SERVICES define - Removed "cas.basic" test - Uncommented ".exec.basic" test and added return-skip at start of test - Moved ScrubContext to separate header file - Renamed CasGC to GcManager - Cleaned up configuration passing in cas store classes - Removed CAS stuff from GcContext and clarified naming in class - Remove migration code
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/cache/structuredcache.cpp18
-rw-r--r--zenserver/cache/structuredcachestore.cpp728
-rw-r--r--zenserver/cache/structuredcachestore.h10
-rw-r--r--zenserver/cidstore.cpp (renamed from zenserver/casstore.cpp)46
-rw-r--r--zenserver/cidstore.h (renamed from zenserver/casstore.h)17
-rw-r--r--zenserver/compute/function.cpp33
-rw-r--r--zenserver/compute/function.h5
-rw-r--r--zenserver/projectstore.cpp10
-rw-r--r--zenserver/projectstore.h4
-rw-r--r--zenserver/testing/launch.cpp26
-rw-r--r--zenserver/testing/launch.h6
-rw-r--r--zenserver/upstream/hordecompute.cpp12
-rw-r--r--zenserver/upstream/upstreamapply.cpp9
-rw-r--r--zenserver/upstream/upstreamapply.h4
-rw-r--r--zenserver/upstream/upstreamcache.cpp1
-rw-r--r--zenserver/zenserver.cpp35
16 files changed, 292 insertions, 672 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 45bbe062b..1f89e7362 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -16,7 +16,6 @@
#include <zencore/trace.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
-#include <zenstore/cas.h>
#include <zenutil/cache/cache.h>
//#include "cachekey.h"
@@ -26,6 +25,7 @@
#include "upstream/upstreamcache.h"
#include "upstream/zen.h"
#include "zenstore/cidstore.h"
+#include "zenstore/scrubcontext.h"
#include <algorithm>
#include <atomic>
@@ -859,7 +859,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
- ValidAttachments.emplace_back(InsertResult.DecompressedId);
+ ValidAttachments.emplace_back(Hash);
if (InsertResult.New)
{
@@ -1205,7 +1205,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
CompressedBuffer Chunk = Attachment->AsCompressedBinary();
CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
- ValidAttachments.emplace_back(InsertResult.DecompressedId);
+ ValidAttachments.emplace_back(ValueHash);
if (InsertResult.New)
{
@@ -2382,7 +2382,7 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
const uint64_t MissCount = m_CacheStats.MissCount;
const uint64_t TotalCount = HitCount + MissCount;
- const CasStoreSize CasSize = m_CidStore.CasSize();
+ const CidStoreSize CidSize = m_CidStore.TotalSize();
const GcStorageSize CacheSize = m_CacheStore.StorageSize();
Cbo.BeginObject("cache");
@@ -2400,12 +2400,12 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
m_UpstreamCache.GetStatus(Cbo);
Cbo.EndObject();
- Cbo.BeginObject("cas");
+ Cbo.BeginObject("cid");
Cbo.BeginObject("size");
- Cbo << "tiny" << CasSize.TinySize;
- Cbo << "small" << CasSize.SmallSize;
- Cbo << "large" << CasSize.LargeSize;
- Cbo << "total" << CasSize.TotalSize;
+ Cbo << "tiny" << CidSize.TinySize;
+ Cbo << "small" << CidSize.SmallSize;
+ Cbo << "large" << CidSize.LargeSize;
+ Cbo << "total" << CidSize.TotalSize;
Cbo.EndObject();
Cbo.EndObject();
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 4be33170c..4e7ad522d 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -18,6 +18,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
#include <xxhash.h>
@@ -66,67 +67,10 @@ namespace {
static_assert(sizeof(CacheBucketIndexHeader) == 32);
- struct LegacyDiskLocation
- {
- inline LegacyDiskLocation() = default;
-
- inline LegacyDiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
- : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
- , LowerSize(ValueSize & 0xFFFFffff)
- , IndexDataSize(IndexSize)
- {
- }
-
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; // Most significant bits of value size (lower 32 bits in LowerSize)
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; // Stored as a separate file
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull; // Serialized as compact binary
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; // Represents a deleted key/value
- static const uint64_t kCompressed = 0x1000'0000'0000'0000ull; // Stored in compressed buffer format
-
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
-
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t Size() const { return LowerSize; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
-
- if (IsFlagSet(LegacyDiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
-
- if (IsFlagSet(LegacyDiskLocation::kCompressed))
- {
- ContentType = ZenContentType::kCompressedBinary;
- }
-
- return ContentType;
- }
- inline uint64_t Flags() const { return OffsetAndFlags & kFlagsMask; }
-
- private:
- uint64_t OffsetAndFlags = 0;
- uint32_t LowerSize = 0;
- uint32_t IndexDataSize = 0;
- };
-
- struct LegacyDiskIndexEntry
- {
- IoHash Key;
- LegacyDiskLocation Location;
- };
-
#pragma pack(pop)
- static_assert(sizeof(LegacyDiskIndexEntry) == 36);
-
- const char* IndexExtension = ".uidx";
- const char* LogExtension = ".slog";
- const char* LegacyDataExtension = ".sobs";
+ const char* IndexExtension = ".uidx";
+ const char* LogExtension = ".slog";
std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName)
{
@@ -143,42 +87,6 @@ namespace {
return BucketDir / (BucketName + LogExtension);
}
- std::filesystem::path GetLegacyLogPath(const std::filesystem::path& BucketDir)
- {
- return BucketDir / (std::string("zen") + LogExtension);
- }
-
- std::filesystem::path GetLegacyDataPath(const std::filesystem::path& BucketDir)
- {
- return BucketDir / (std::string("zen") + LegacyDataExtension);
- }
-
- bool ValidateLegacyEntry(const LegacyDiskIndexEntry& Entry, std::string& OutReason)
- {
- if (Entry.Key == IoHash::Zero)
- {
- OutReason = fmt::format("Invalid hash key {}", Entry.Key.ToHexString());
- return false;
- }
- if (Entry.Location.Flags() & ~(LegacyDiskLocation::kStandaloneFile | LegacyDiskLocation::kStructured |
- LegacyDiskLocation::kTombStone | LegacyDiskLocation::kCompressed))
- {
- OutReason = fmt::format("Invalid flags {} for entry {}", Entry.Location.Flags(), Entry.Key.ToHexString());
- return false;
- }
- if (!Entry.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
- {
- return true;
- }
- uint64_t Size = Entry.Location.Size();
- if (Size == 0)
- {
- OutReason = fmt::format("Invalid size {} for entry {}", Size, Entry.Key.ToHexString());
- return false;
- }
- return true;
- }
-
bool ValidateEntry(const DiskIndexEntry& Entry, std::string& OutReason)
{
if (Entry.Key == IoHash::Zero)
@@ -262,7 +170,7 @@ SaveCompactBinaryObject(const fs::path& Path, const CbObject& Object)
WriteFile(Path, Object.GetBuffer().AsIoBuffer());
}
-ZenCacheNamespace::ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir)
+ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir)
: GcStorage(Gc)
, GcContributor(Gc)
, m_RootDir(RootDir)
@@ -583,9 +491,25 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
std::vector<IoHash> BadHashes;
+ auto ValidateEntry = [](ZenContentType ContentType, IoBuffer Buffer) {
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All);
+ return Error == CbValidateError::None;
+ }
+ if (ContentType == ZenContentType::kCompressedBinary)
+ {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer)); !Compressed)
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
for (auto& Kv : m_CacheMap)
{
- if (Kv.first != IoHash::HashBuffer(Kv.second.Payload))
+ if (!ValidateEntry(Kv.second.Payload.GetContentType(), Kv.second.Payload))
{
BadHashes.push_back(Kv.first);
}
@@ -593,7 +517,7 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (!BadHashes.empty())
{
- Ctx.ReportBadCasChunks(BadHashes);
+ Ctx.ReportBadCidChunks(BadHashes);
}
}
@@ -891,229 +815,6 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(uint64_t SkipEntryCount)
return 0;
};
-uint64_t
-ZenCacheDiskLayer::CacheBucket::MigrateLegacyData(bool CleanSource)
-{
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
-
- if (!std::filesystem::is_regular_file(LegacyLogPath) || std::filesystem::file_size(LegacyLogPath) == 0)
- {
- return 0;
- }
-
- ZEN_INFO("migrating store {}", m_BucketDir / m_BucketName);
-
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
-
- uint64_t MigratedChunkCount = 0;
- uint32_t MigratedBlockCount = 0;
- Stopwatch MigrationTimer;
- uint64_t TotalSize = 0;
- const auto _ = MakeGuard([&] {
- ZEN_INFO("migrated store '{}' to #{} chunks in #{} blocks in {} ({})",
- m_BucketDir / m_BucketName,
- MigratedChunkCount,
- MigratedBlockCount,
- NiceTimeSpanMs(MigrationTimer.GetElapsedTimeMs()),
- NiceBytes(TotalSize));
- });
-
- uint64_t BlockFileSize = 0;
- {
- BasicFile BlockFile;
- BlockFile.Open(LegacyDataPath, CleanSource ? BasicFile::Mode::kWrite : BasicFile::Mode::kRead);
- BlockFileSize = BlockFile.FileSize();
- }
-
- std::unordered_map<IoHash, LegacyDiskIndexEntry, IoHash::Hasher> LegacyDiskIndex;
- uint64_t InvalidEntryCount = 0;
-
- size_t BlockChunkCount = 0;
- TCasLogFile<LegacyDiskIndexEntry> LegacyCasLog;
- LegacyCasLog.Open(LegacyLogPath, CleanSource ? CasLogFile::Mode::kWrite : CasLogFile::Mode::kRead);
- {
- Stopwatch Timer;
- const auto __ = MakeGuard([&] {
- ZEN_INFO("read store '{}' legacy log containing #{} entries in {}",
- LegacyLogPath,
- LegacyDiskIndex.size(),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- });
- if (LegacyCasLog.Initialize())
- {
- LegacyDiskIndex.reserve(LegacyCasLog.GetLogCount());
- LegacyCasLog.Replay(
- [&](const LegacyDiskIndexEntry& Record) {
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kTombStone))
- {
- LegacyDiskIndex.erase(Record.Key);
- return;
- }
- std::string InvalidEntryReason;
- if (!ValidateLegacyEntry(Record, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LegacyLogPath, InvalidEntryReason);
- ++InvalidEntryCount;
- return;
- }
- if (m_Index.contains(Record.Key))
- {
- return;
- }
- LegacyDiskIndex[Record.Key] = Record;
- },
- 0);
-
- std::vector<IoHash> BadEntries;
- for (const auto& Entry : LegacyDiskIndex)
- {
- const LegacyDiskIndexEntry& Record(Entry.second);
- if (Record.Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- continue;
- }
- if (Record.Location.Offset() + Record.Location.Size() <= BlockFileSize)
- {
- BlockChunkCount++;
- continue;
- }
- ZEN_WARN("skipping invalid entry in '{}', reason: location is outside of file", LegacyLogPath);
- BadEntries.push_back(Entry.first);
- }
- for (const IoHash& BadHash : BadEntries)
- {
- LegacyDiskIndex.erase(BadHash);
- }
- InvalidEntryCount += BadEntries.size();
- }
- }
- if (InvalidEntryCount)
- {
- ZEN_WARN("found #{} invalid entries in '{}'", InvalidEntryCount, m_BucketDir / m_BucketName);
- }
-
- if (LegacyDiskIndex.empty())
- {
- LegacyCasLog.Close();
- if (CleanSource)
- {
- // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
- // a manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- }
- return 0;
- }
-
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- CreateDirectories(LogPath.parent_path());
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kWrite);
-
- std::unordered_map<size_t, IoHash> ChunkIndexToChunkHash;
- std::vector<BlockStoreLocation> ChunkLocations;
- ChunkIndexToChunkHash.reserve(BlockChunkCount);
- ChunkLocations.reserve(BlockChunkCount);
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(LegacyDiskIndex.size() - BlockChunkCount);
-
- for (const auto& Entry : LegacyDiskIndex)
- {
- const IoHash& ChunkHash = Entry.first;
- const LegacyDiskLocation& Location = Entry.second.Location;
- if (Location.IsFlagSet(LegacyDiskLocation::kStandaloneFile))
- {
- uint8_t Flags = 0xff & (Location.Flags() >> 56);
- DiskLocation NewLocation = DiskLocation(Location.Size(), Flags);
- LogEntries.push_back({.Key = Entry.second.Key, .Location = NewLocation});
- continue;
- }
- size_t ChunkIndex = ChunkLocations.size();
- ChunkLocations.push_back({.BlockIndex = 0, .Offset = Location.Offset(), .Size = Location.Size()});
- ChunkIndexToChunkHash[ChunkIndex] = ChunkHash;
- TotalSize += Location.Size();
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- }
- CasLog.Append(LogEntries);
-
- m_BlockStore.Split(
- ChunkLocations,
- LegacyDataPath,
- m_BlocksBasePath,
- MaxBlockSize,
- BlockStoreDiskLocation::MaxBlockIndex + 1,
- m_PayloadAlignment,
- CleanSource,
- [this, &LegacyDiskIndex, &ChunkIndexToChunkHash, &LegacyCasLog, &CasLog, CleanSource, &MigratedBlockCount, &MigratedChunkCount](
- const BlockStore::MovedChunksArray& MovedChunks) {
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const BlockStoreLocation& NewLocation = Entry.second;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- const LegacyDiskLocation& OldLocation = OldEntry.Location;
- uint8_t Flags = 0xff & (OldLocation.Flags() >> 56);
- LogEntries.push_back({.Key = ChunkHash, .Location = DiskLocation(NewLocation, m_PayloadAlignment, Flags)});
- }
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- m_Index.insert_or_assign(Entry.Key, IndexEntry(Entry.Location, GcClock::TickCount()));
- }
- CasLog.Append(LogEntries);
- CasLog.Flush();
- if (CleanSource)
- {
- std::vector<LegacyDiskIndexEntry> LegacyLogEntries;
- LegacyLogEntries.reserve(MovedChunks.size());
- for (const auto& Entry : MovedChunks)
- {
- size_t ChunkIndex = Entry.first;
- const IoHash& ChunkHash = ChunkIndexToChunkHash[ChunkIndex];
- const LegacyDiskIndexEntry& OldEntry = LegacyDiskIndex[ChunkHash];
- const LegacyDiskLocation& OldLocation = OldEntry.Location;
- LegacyDiskLocation NewLocation(OldLocation.Offset(),
- OldLocation.Size(),
- 0,
- OldLocation.Flags() | LegacyDiskLocation::kTombStone);
- LegacyLogEntries.push_back(LegacyDiskIndexEntry{.Key = ChunkHash, .Location = NewLocation});
- }
- LegacyCasLog.Append(LegacyLogEntries);
- LegacyCasLog.Flush();
- }
- MigratedBlockCount++;
- MigratedChunkCount += MovedChunks.size();
- });
-
- LegacyCasLog.Close();
- CasLog.Close();
-
- if (CleanSource)
- {
- // Older versions of ZenCacheDiskLayer expects the legacy files to exist if it can find
- // a manifest and crashes on startup if they don't.
- // In order to not break startup when switching back an older version, lets just reset
- // the legacy data files to zero length.
-
- BasicFile LegacyLog;
- LegacyLog.Open(LegacyLogPath, BasicFile::Mode::kTruncate);
- BasicFile LegacySobs;
- LegacySobs.Open(LegacyDataPath, BasicFile::Mode::kTruncate);
- }
- return MigratedChunkCount;
-}
-
void
ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool IsNew)
{
@@ -1123,23 +824,18 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
m_Index.clear();
- std::filesystem::path LegacyLogPath = GetLegacyLogPath(m_BucketDir);
- std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
- std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
+ std::filesystem::path LogPath = GetLogPath(m_BucketDir, m_BucketName);
+ std::filesystem::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName);
if (IsNew)
{
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(m_BucketDir);
- fs::remove(LegacyLogPath);
- fs::remove(LegacyDataPath);
fs::remove(LogPath);
fs::remove(IndexPath);
fs::remove_all(m_BlocksBasePath);
}
- uint64_t LogPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(LogPosition);
- uint64_t LegacyLogEntryCount = MigrateLegacyData(true);
+ uint64_t LogPosition = ReadIndexFile();
+ uint64_t LogEntryCount = ReadLog(LogPosition);
CreateDirectories(m_BucketDir);
@@ -1161,7 +857,7 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const fs::path& BucketDir, const bool Is
m_BlockStore.Initialize(m_BlocksBasePath, MaxBlockSize, BlockStoreDiskLocation::MaxBlockIndex + 1, KnownLocations);
- if (IsNew || ((LogEntryCount + LegacyLogEntryCount) > 0))
+ if (IsNew || LogEntryCount > 0)
{
MakeIndexSnapshot();
}
@@ -1309,6 +1005,7 @@ void
ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
{
std::vector<IoHash> BadKeys;
+ uint64_t ChunkCount{0}, ChunkBytes{0};
std::vector<BlockStoreLocation> ChunkLocations;
std::vector<IoHash> ChunkIndexToChunkHash;
@@ -1341,6 +1038,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
{
+ ++ChunkCount;
+ ChunkBytes += Loc.Size();
if (Loc.GetContentType() == ZenContentType::kBinary)
{
ExtendablePathBuilder<256> DataFilePath;
@@ -1381,6 +1080,8 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
}
const auto ValidateSmallChunk = [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
if (!Data)
{
@@ -1403,8 +1104,11 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
};
const auto ValidateLargeChunk = [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ++ChunkCount;
+ ChunkBytes += Size;
const IoHash& Hash = ChunkIndexToChunkHash[ChunkIndex];
- IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
+ // TODO: Add API to verify compressed buffer and possible structure data without having to memorymap the whole file
+ IoBuffer Buffer(IoBuffer::BorrowedFile, File.GetBasicFile().Handle(), Offset, Size);
if (!Buffer)
{
BadKeys.push_back(Hash);
@@ -1422,40 +1126,41 @@ ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx)
_.ReleaseNow();
- if (BadKeys.empty())
- {
- return;
- }
+ Ctx.ReportScrubbed(ChunkCount, ChunkBytes);
- ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
-
- if (Ctx.RunRecovery())
+ if (!BadKeys.empty())
{
- // Deal with bad chunks by removing them from our lookup map
-
- std::vector<DiskIndexEntry> LogEntries;
- LogEntries.reserve(BadKeys.size());
+ ZEN_ERROR("Scrubbing found #{} bad chunks in '{}'", BadKeys.size(), m_BucketDir / m_BucketName);
+ if (Ctx.RunRecovery())
{
- RwLock::ExclusiveLockScope __(m_IndexLock);
- for (const IoHash& BadKey : BadKeys)
- {
- // Log a tombstone and delete the in-memory index for the bad entry
+ // Deal with bad chunks by removing them from our lookup map
- const auto It = m_Index.find(BadKey);
- DiskLocation Location = It->second.Location;
- Location.Flags |= DiskLocation::kTombStone;
- LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
- m_Index.erase(BadKey);
+ std::vector<DiskIndexEntry> LogEntries;
+ LogEntries.reserve(BadKeys.size());
+
+ {
+ RwLock::ExclusiveLockScope __(m_IndexLock);
+ for (const IoHash& BadKey : BadKeys)
+ {
+ // Log a tombstone and delete the in-memory index for the bad entry
+ const auto It = m_Index.find(BadKey);
+ DiskLocation Location = It->second.Location;
+ Location.Flags |= DiskLocation::kTombStone;
+ LogEntries.push_back(DiskIndexEntry{.Key = BadKey, .Location = Location});
+ m_Index.erase(BadKey);
+ }
}
+ m_SlogFile.Append(LogEntries);
}
- m_SlogFile.Append(LogEntries);
}
// Let whomever it concerns know about the bad chunks. This could
// be used to invalidate higher level data structures more efficiently
// than a full validation pass might be able to do
- Ctx.ReportBadCasChunks(BadKeys);
+ Ctx.ReportBadCidChunks(BadKeys);
+
+ ZEN_INFO("cache bucket scrubbed: {} chunks ({})", ChunkCount, NiceBytes(ChunkBytes));
}
void
@@ -1517,7 +1222,7 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
{
if (Cids.size() > 1024)
{
- GcCtx.ContributeCids(Cids);
+ GcCtx.AddRetainedCids(Cids);
Cids.clear();
}
@@ -1552,8 +1257,8 @@ ZenCacheDiskLayer::CacheBucket::GatherReferences(GcContext& GcCtx)
}
}
- GcCtx.ContributeCids(Cids);
- GcCtx.ContributeCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
+ GcCtx.AddRetainedCids(Cids);
+ GcCtx.SetExpiredCacheKeys(m_BucketDir.string(), std::move(ExpiredKeys));
}
void
@@ -1601,7 +1306,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
std::span<const IoHash> ExpiredCacheKeys = GcCtx.ExpiredCacheKeys(m_BucketDir.string());
std::vector<IoHash> DeleteCacheKeys;
DeleteCacheKeys.reserve(ExpiredCacheKeys.size());
- GcCtx.FilterCas(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(ExpiredCacheKeys, [&](const IoHash& ChunkHash, bool Keep) {
if (Keep)
{
return;
@@ -1752,7 +1457,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
ChunkLocations.reserve(TotalChunkCount);
ChunkIndexToChunkHash.reserve(TotalChunkCount);
- GcCtx.FilterCas(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
+ GcCtx.FilterCids(TotalChunkHashes, [&](const IoHash& ChunkHash, bool Keep) {
auto KeyIt = Index.find(ChunkHash);
const DiskLocation& DiskLocation = KeyIt->second.Location;
BlockStoreLocation Location = DiskLocation.GetBlockLocation(m_PayloadAlignment);
@@ -1836,7 +1541,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
},
[&]() { return GcCtx.CollectSmallObjects(); });
- GcCtx.DeletedCas(DeletedChunks);
+ GcCtx.AddDeletedCids(DeletedChunks);
}
void
@@ -2302,7 +2007,7 @@ ZenCacheDiskLayer::TotalSize() const
static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
-ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
+ZenCacheStore::ZenCacheStore(GcManager& Gc, const Configuration& Configuration)
: GcStorage(Gc)
, GcContributor(Gc)
, m_Gc(Gc)
@@ -2313,7 +2018,6 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
DirectoryContent DirContent;
GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
- std::vector<std::string> LegacyBuckets;
std::vector<std::string> Namespaces;
for (const std::filesystem::path& DirPath : DirContent.Directories)
{
@@ -2323,33 +2027,17 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
continue;
}
- LegacyBuckets.push_back(DirName);
}
- ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), m_Configuration.BasePath, LegacyBuckets.size());
+ ZEN_INFO("Found #{} namespaces in '{}'", Namespaces.size(), m_Configuration.BasePath);
if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
{
// default (unspecified) and ue4-ddc namespace points to the same namespace instance
- ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName);
-
std::filesystem::path DefaultNamespaceFolder =
m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
CreateDirectories(DefaultNamespaceFolder);
-
- // Move any non-namespace folders into the default namespace folder
- for (const std::string& DirName : LegacyBuckets)
- {
- std::filesystem::path LegacyFolder = m_Configuration.BasePath / DirName;
- std::filesystem::path NewPath = DefaultNamespaceFolder / DirName;
- std::error_code Ec;
- std::filesystem::rename(LegacyFolder, NewPath, Ec);
- if (Ec)
- {
- ZEN_ERROR("Unable to move '{}' to '{}', reason '{}'", LegacyFolder, NewPath, Ec.message());
- }
- }
Namespaces.push_back(std::string(UE4DDCNamespaceName));
}
@@ -2537,7 +2225,7 @@ TEST_CASE("z$.store")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
@@ -2592,7 +2280,7 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() - 256);
@@ -2612,7 +2300,7 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
@@ -2635,7 +2323,7 @@ TEST_CASE("z$.size")
GcStorageSize CacheSize;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(Zcs.DiskLayerThreshold() + 64);
@@ -2655,7 +2343,7 @@ TEST_CASE("z$.size")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const GcStorageSize SerializedSize = Zcs.StorageSize();
@@ -2680,7 +2368,7 @@ TEST_CASE("z$.gc")
ScopedTemporaryDirectory TempDir;
std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)};
- const auto CollectAndFilter = [](CasGc& Gc,
+ const auto CollectAndFilter = [](GcManager& Gc,
GcClock::TimePoint Time,
GcClock::Duration MaxDuration,
std::span<const IoHash> Cids,
@@ -2693,7 +2381,7 @@ TEST_CASE("z$.gc")
};
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "teardrinker"sv;
@@ -2730,7 +2418,7 @@ TEST_CASE("z$.gc")
// Expect timestamps to be serialized
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
std::vector<IoHash> Keep;
@@ -2751,7 +2439,7 @@ TEST_CASE("z$.gc")
SUBCASE("gc removes standalone values")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "fortysixandtwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -2799,7 +2487,7 @@ TEST_CASE("z$.gc")
SUBCASE("gc removes small objects")
{
ScopedTemporaryDirectory TempDir;
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
const auto Bucket = "rightintwo"sv;
const GcClock::TimePoint CurrentTime = GcClock::Now();
@@ -2848,154 +2536,6 @@ TEST_CASE("z$.gc")
}
}
-TEST_CASE("z$.legacyconversion")
-{
- ScopedTemporaryDirectory TempDir;
-
- uint64_t ChunkSizes[] = {2041,
- 1123,
- 1223,
- 1239,
- 341,
- 1412,
- 912,
- 774,
- 341,
- 431,
- 554,
- 1098,
- 2048,
- 339 + 64 * 1024,
- 561 + 64 * 1024,
- 16 + 64 * 1024,
- 16 + 64 * 1024,
- 2048,
- 2048};
- size_t ChunkCount = sizeof(ChunkSizes) / sizeof(uint64_t);
- size_t SingleBlockSize = 0;
- std::vector<IoBuffer> Chunks;
- Chunks.reserve(ChunkCount);
- for (uint64_t Size : ChunkSizes)
- {
- Chunks.push_back(testutils::CreateBinaryCacheValue(Size));
- SingleBlockSize += Size;
- }
-
- ZEN_UNUSED(SingleBlockSize);
-
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(ChunkCount);
- for (const IoBuffer& Chunk : Chunks)
- {
- ChunkHashes.push_back(IoHash::HashBuffer(Chunk.Data(), Chunk.Size()));
- }
-
- CreateDirectories(TempDir.Path());
-
- const std::string Bucket = "rightintwo";
- {
- CasGc Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
- const GcClock::TimePoint CurrentTime = GcClock::Now();
-
- for (size_t i = 0; i < ChunkCount; i++)
- {
- Zcs.Put(Bucket, ChunkHashes[i], {.Value = Chunks[i]});
- }
-
- std::vector<IoHash> KeepChunks;
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- KeepChunks.push_back(ChunkHashes[i]);
- }
- GcContext GcCtx(CurrentTime + std::chrono::hours(2));
- GcCtx.MaxCacheDuration(std::chrono::minutes(2));
- GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepChunks);
- Zcs.Flush();
- Gc.CollectGarbage(GcCtx);
- }
- std::filesystem::path BucketDir = TempDir.Path() / Bucket;
- std::filesystem::path BlocksBaseDir = BucketDir / "blocks";
-
- std::filesystem::path CasPath = BlockStore ::GetBlockPath(BlocksBaseDir, 1);
- std::filesystem::path LegacyDataPath = GetLegacyDataPath(BucketDir);
- std::filesystem::remove(LegacyDataPath);
- std::filesystem::rename(CasPath, LegacyDataPath);
-
- std::vector<DiskIndexEntry> LogEntries;
- std::filesystem::path IndexPath = GetIndexPath(BucketDir, Bucket);
- if (std::filesystem::is_regular_file(IndexPath))
- {
- BasicFile ObjectIndexFile;
- ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead);
- uint64_t Size = ObjectIndexFile.FileSize();
- if (Size >= sizeof(CacheBucketIndexHeader))
- {
- uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry);
- CacheBucketIndexHeader Header;
- ObjectIndexFile.Read(&Header, sizeof(Header), 0);
- if (Header.Magic == CacheBucketIndexHeader::ExpectedMagic && Header.Version == CacheBucketIndexHeader::CurrentVersion &&
- Header.PayloadAlignment > 0 && Header.EntryCount == ExpectedEntryCount)
- {
- LogEntries.resize(Header.EntryCount);
- ObjectIndexFile.Read(LogEntries.data(), Header.EntryCount * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader));
- }
- }
- ObjectIndexFile.Close();
- std::filesystem::remove(IndexPath);
- }
-
- std::filesystem::path LogPath = GetLogPath(BucketDir, Bucket);
- {
- TCasLogFile<DiskIndexEntry> CasLog;
- CasLog.Open(LogPath, CasLogFile::Mode::kRead);
- LogEntries.reserve(CasLog.GetLogCount());
- CasLog.Replay([&](const DiskIndexEntry& Record) { LogEntries.push_back(Record); }, 0);
- }
- TCasLogFile<LegacyDiskIndexEntry> LegacyLog;
- std::filesystem::path LegacylogPath = GetLegacyLogPath(BucketDir);
- LegacyLog.Open(LegacylogPath, CasLogFile::Mode::kTruncate);
-
- for (const DiskIndexEntry& Entry : LogEntries)
- {
- uint64_t Size;
- uint64_t Offset;
- if (Entry.Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- Size = Entry.Location.Location.StandaloneSize;
- Offset = 0;
- }
- else
- {
- BlockStoreLocation Location = Entry.Location.GetBlockLocation(16);
- Size = Location.Size;
- Offset = Location.Offset;
- }
- LegacyDiskLocation LegacyLocation(Offset, Size, 0, static_cast<uint64_t>(Entry.Location.Flags) << 56);
- LegacyDiskIndexEntry LegacyEntry = {.Key = Entry.Key, .Location = LegacyLocation};
- LegacyLog.Append(LegacyEntry);
- }
- LegacyLog.Close();
-
- std::filesystem::remove_all(BlocksBaseDir);
- std::filesystem::remove(LogPath);
- std::filesystem::remove(IndexPath);
-
- {
- CasGc Gc;
- ZenCacheNamespace Zcs(Gc, TempDir.Path());
-
- for (size_t i = 0; i < ChunkCount; i += 2)
- {
- ZenCacheValue Value;
- CHECK(Zcs.Get(Bucket, ChunkHashes[i], Value));
- CHECK(ChunkHashes[i] == IoHash::HashBuffer(Value.Value));
- CHECK(!Zcs.Get(Bucket, ChunkHashes[i + 1], Value));
- }
- }
-}
-
TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
{
// for (uint32_t i = 0; i < 100; ++i)
@@ -3045,7 +2585,7 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
CreateDirectories(TempDir.Path());
WorkerThreadPool ThreadPool(4);
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path());
{
@@ -3169,10 +2709,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Zcs.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
while (WorkCompleted < NewChunks.size() + Chunks.size())
@@ -3217,10 +2757,10 @@ TEST_CASE("z$.threadedinsert") // * doctest::skip(true))
GcContext GcCtx;
GcCtx.CollectSmallObjects(true);
- GcCtx.ContributeCas(KeepHashes);
+ GcCtx.AddRetainedCids(KeepHashes);
Zcs.CollectGarbage(GcCtx);
- CasChunkSet& Deleted = GcCtx.DeletedCas();
- Deleted.IterateChunks([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
+ const HashKeySet& Deleted = GcCtx.DeletedCids();
+ Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); });
}
}
{
@@ -3261,7 +2801,7 @@ TEST_CASE("z$.namespaces")
IoHash Key1;
IoHash Key2;
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false});
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -3286,7 +2826,7 @@ TEST_CASE("z$.namespaces")
}
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
@@ -3346,7 +2886,7 @@ TEST_CASE("z$.drop.bucket")
};
WorkerThreadPool Workers(1);
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket = "teardrinker"sv;
const auto Namespace = "mynamespace"sv;
@@ -3415,7 +2955,7 @@ TEST_CASE("z$.drop.namespace")
};
WorkerThreadPool Workers(1);
{
- CasGc Gc;
+ GcManager Gc;
ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
const auto Bucket1 = "teardrinker1"sv;
const auto Bucket2 = "teardrinker2"sv;
@@ -3480,7 +3020,7 @@ TEST_CASE("z$.blocked.disklayer.put")
return Writer.Save();
};
- CasGc Gc;
+ GcManager Gc;
ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
CbObject CacheValue = CreateCacheValue(64 * 1024 + 64);
@@ -3517,6 +3057,96 @@ TEST_CASE("z$.blocked.disklayer.put")
CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
}
+TEST_CASE("z$.scrub")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ using namespace testutils;
+
+ struct CacheRecord
+ {
+ IoBuffer Record;
+ std::vector<CompressedBuffer> Attachments;
+ };
+
+ auto CreateCacheRecord = [](bool Structured, std::string_view Bucket, const IoHash& Key, const std::vector<size_t>& AttachmentSizes) {
+ CacheRecord Result;
+ if (Structured)
+ {
+ Result.Attachments.resize(AttachmentSizes.size());
+ CbObjectWriter Record;
+ Record.BeginObject("Key"sv);
+ {
+ Record << "Bucket"sv << Bucket;
+ Record << "Hash"sv << Key;
+ }
+ Record.EndObject();
+ for (size_t Index = 0; Index < AttachmentSizes.size(); Index++)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSizes[Index]);
+ CompressedBuffer CompressedAttachmentData = CompressedBuffer::Compress(SharedBuffer(AttachmentData));
+ Record.AddBinaryAttachment(fmt::format("attachment-{}", Index), IoHash::FromBLAKE3(CompressedAttachmentData.GetRawHash()));
+ Result.Attachments[Index] = CompressedAttachmentData;
+ }
+ Result.Record = Record.Save().GetBuffer().AsIoBuffer();
+ Result.Record.SetContentType(ZenContentType::kCbObject);
+ }
+ else
+ {
+ std::string RecordData = fmt::format("{}:{}", Bucket, Key.ToHexString());
+ size_t TotalSize = RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ TotalSize += AttachmentSize;
+ }
+ Result.Record = IoBuffer(TotalSize);
+ char* DataPtr = (char*)Result.Record.MutableData();
+ memcpy(DataPtr, RecordData.c_str(), RecordData.length() + 1);
+ DataPtr += RecordData.length() + 1;
+ for (size_t AttachmentSize : AttachmentSizes)
+ {
+ IoBuffer AttachmentData = CreateBinaryCacheValue(AttachmentSize);
+ memcpy(DataPtr, AttachmentData.GetData(), AttachmentData.GetSize());
+ DataPtr += AttachmentData.GetSize();
+ }
+ }
+ return Result;
+ };
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ ZenCacheNamespace Zcs(Gc, TempDir.Path() / "cache");
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ auto CreateRecords =
+ [&](bool IsStructured, std::string_view BucketName, const std::vector<IoHash>& Cids, const std::vector<size_t>& AttachmentSizes) {
+ for (const IoHash& Cid : Cids)
+ {
+ CacheRecord Record = CreateCacheRecord(IsStructured, BucketName, Cid, AttachmentSizes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record});
+ for (const CompressedBuffer& Attachment : Record.Attachments)
+ {
+ CidStore.AddChunk(Attachment);
+ }
+ }
+ };
+
+ std::vector<size_t> AttachmentSizes = {16, 1000, 2000, 4000, 8000, 64000, 80000};
+
+ std::vector<IoHash> UnstructuredCids{CreateKey(4), CreateKey(5), CreateKey(6)};
+ CreateRecords(false, "mybucket"sv, UnstructuredCids, AttachmentSizes);
+
+ std::vector<IoHash> StructuredCids{CreateKey(1), CreateKey(2), CreateKey(3)};
+ CreateRecords(true, "mybucket"sv, StructuredCids, AttachmentSizes);
+
+ ScrubContext ScrubCtx;
+ Zcs.Scrub(ScrubCtx);
+ CidStore.Scrub(ScrubCtx);
+ CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size());
+ CHECK(ScrubCtx.BadCids().GetSize() == 0);
+}
+
#endif
void
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index ea33a3c00..b81e44835 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -9,7 +9,6 @@
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
#include <zenstore/blockstore.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
@@ -27,8 +26,9 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
class PathBuilderBase;
-class CasGc;
+class GcManager;
class ZenCacheTracker;
+class ScrubContext;
/******************************************************************************
@@ -327,7 +327,7 @@ private:
class ZenCacheNamespace final : public RefCounted, public GcStorage, public GcContributor
{
public:
- ZenCacheNamespace(CasGc& Gc, const std::filesystem::path& RootDir);
+ ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir);
~ZenCacheNamespace();
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -369,7 +369,7 @@ public:
bool AllowAutomaticCreationOfNamespaces = true;
};
- ZenCacheStore(CasGc& Gc, const Configuration& Configuration);
+ ZenCacheStore(GcManager& Gc, const Configuration& Configuration);
~ZenCacheStore();
bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
@@ -393,7 +393,7 @@ private:
NamespaceMap m_Namespaces;
std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
- CasGc& m_Gc;
+ GcManager& m_Gc;
Configuration m_Configuration;
};
diff --git a/zenserver/casstore.cpp b/zenserver/cidstore.cpp
index 872a40df8..5de347a17 100644
--- a/zenserver/casstore.cpp
+++ b/zenserver/cidstore.cpp
@@ -1,23 +1,25 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include "casstore.h"
+#include "cidstore.h"
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zenstore/cidstore.h>
#include <gsl/gsl-lite.hpp>
namespace zen {
-HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
+HttpCidService::HttpCidService(CidStore& Store) : m_CidStore(Store)
{
- m_Router.AddPattern("cas", "([0-9A-Fa-f]{40})");
+ m_Router.AddPattern("cid", "([0-9A-Fa-f]{40})");
m_Router.RegisterRoute(
- "{cas}",
+ "{cid}",
[this](HttpRouterRequest& Req) {
IoHash Hash = IoHash::FromHexString(Req.GetCapture(1));
- ZEN_DEBUG("CAS request for {}", Hash);
+ ZEN_DEBUG("CID request for {}", Hash);
HttpServerRequest& ServerRequest = Req.ServerRequest();
@@ -26,7 +28,7 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
case HttpVerb::kGet:
case HttpVerb::kHead:
{
- if (IoBuffer Value = m_CasStore.FindChunk(Hash))
+ if (IoBuffer Value = m_CidStore.FindChunkByCid(Hash))
{
return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
}
@@ -37,8 +39,14 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
case HttpVerb::kPut:
{
- IoBuffer Payload = ServerRequest.ReadPayload();
- IoHash PayloadHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+ IoBuffer Payload = ServerRequest.ReadPayload();
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (!Compressed)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::UnsupportedMediaType);
+ }
+
+ IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
// URI hash must match content hash
if (PayloadHash != Hash)
@@ -46,7 +54,7 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
return ServerRequest.WriteResponse(HttpResponseCode::BadRequest);
}
- m_CasStore.InsertChunk(Payload, PayloadHash);
+ m_CidStore.AddChunk(Compressed);
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -60,13 +68,13 @@ HttpCasService::HttpCasService(CasStore& Store) : m_CasStore(Store)
}
const char*
-HttpCasService::BaseUri() const
+HttpCidService::BaseUri() const
{
- return "/cas/";
+ return "/cid/";
}
void
-HttpCasService::HandleRequest(zen::HttpServerRequest& Request)
+HttpCidService::HandleRequest(zen::HttpServerRequest& Request)
{
if (Request.RelativeUri().empty())
{
@@ -77,12 +85,18 @@ HttpCasService::HandleRequest(zen::HttpServerRequest& Request)
case HttpVerb::kPut:
case HttpVerb::kPost:
{
- IoBuffer Payload = Request.ReadPayload();
- IoHash PayloadHash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+ IoBuffer Payload = Request.ReadPayload();
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (!Compressed)
+ {
+ return Request.WriteResponse(HttpResponseCode::UnsupportedMediaType);
+ }
+
+ IoHash PayloadHash = IoHash::FromBLAKE3(Compressed.GetRawHash());
- ZEN_DEBUG("CAS POST request for {} ({} bytes)", PayloadHash, Payload.Size());
+ ZEN_DEBUG("CID POST request for {} ({} bytes)", PayloadHash, Payload.Size());
- auto InsertResult = m_CasStore.InsertChunk(Payload, PayloadHash);
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
if (InsertResult.New)
{
diff --git a/zenserver/casstore.h b/zenserver/cidstore.h
index 4ca6908b5..8e7832b35 100644
--- a/zenserver/casstore.h
+++ b/zenserver/cidstore.h
@@ -3,31 +3,32 @@
#pragma once
#include <zenhttp/httpserver.h>
-#include <zenstore/cas.h>
namespace zen {
/**
- * Simple CAS store HTTP endpoint
+ * Simple CID store HTTP endpoint
*
* Note that since this does not end up pinning any of the chunks it's only really useful for a small subset of use cases where you know a
- * chunk exists in the underlying CAS store. Thus it's mainly useful for internal use when communicating between Zen store instances
+ * chunk exists in the underlying CID store. Thus it's mainly useful for internal use when communicating between Zen store instances
*
- * Using this interface for adding CAS chunks makes little sense except for testing purposes as garbage collection may reap anything you add
+ * Using this interface for adding CID chunks makes little sense except for testing purposes as garbage collection may reap anything you add
* before anything ever gets to access it
*/
-class HttpCasService : public HttpService
+class CidStore;
+
+class HttpCidService : public HttpService
{
public:
- explicit HttpCasService(CasStore& Store);
- ~HttpCasService() = default;
+ explicit HttpCidService(CidStore& Store);
+ ~HttpCidService() = default;
virtual const char* BaseUri() const override;
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
private:
- CasStore& m_CasStore;
+ CidStore& m_CidStore;
HttpRequestRouter m_Router;
};
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp
index 171c67a6e..d7316ac64 100644
--- a/zenserver/compute/function.cpp
+++ b/zenserver/compute/function.cpp
@@ -17,7 +17,6 @@
# include <zencore/iobuffer.h>
# include <zencore/iohash.h>
# include <zencore/scopeguard.h>
-# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
# include <span>
@@ -26,25 +25,22 @@ using namespace std::literals;
namespace zen {
-HttpFunctionService::HttpFunctionService(CasStore& Store,
- CidStore& InCidStore,
+HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
const CloudCacheClientOptions& ComputeOptions,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& ComputeAuthConfig,
const UpstreamAuthConfig& StorageAuthConfig,
AuthMgr& Mgr)
: m_Log(logging::Get("apply"))
-, m_CasStore(Store)
, m_CidStore(InCidStore)
{
- m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore);
+ m_UpstreamApply = UpstreamApply::Create({}, m_CidStore);
InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] {
auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
ComputeAuthConfig,
StorageOptions,
StorageAuthConfig,
- m_CasStore,
m_CidStore,
Mgr);
m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
@@ -99,18 +95,18 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
// Determine which pieces are missing and need to be transmitted to populate CAS
- CasChunkSet ChunkSet;
+ HashKeySet ChunkSet;
FunctionSpec.IterateAttachments([&](CbFieldView Field) {
const IoHash Hash = Field.AsHash();
- ChunkSet.AddChunkToSet(Hash);
+ ChunkSet.AddHashToSet(Hash);
});
// Note that we store executables uncompressed to make it
// more straightforward and efficient to materialize them, hence
// the CAS lookup here instead of CID for the input payloads
- m_CasStore.FilterChunks(ChunkSet);
+ m_CidStore.FilterChunks(ChunkSet);
if (ChunkSet.IsEmpty())
{
@@ -127,7 +123,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
CbObjectWriter ResponseWriter;
ResponseWriter.BeginArray("need");
- ChunkSet.IterateChunks([&](const IoHash& Hash) {
+ ChunkSet.IterateHashes([&](const IoHash& Hash) {
ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
ResponseWriter.AddHash(Hash);
@@ -159,25 +155,18 @@ HttpFunctionService::HttpFunctionService(CasStore& Store,
{
ZEN_ASSERT(Attachment.IsCompressedBinary());
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
- SharedBuffer Decompressed = DataView.Decompress();
- const uint64_t DecompressedSize = DataView.GetRawSize();
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
ZEN_UNUSED(DataHash);
-
- TotalAttachmentBytes += DecompressedSize;
+ TotalAttachmentBytes += Buffer.GetCompressedSize();
++AttachmentCount;
- // Note that we store executables uncompressed to make it
- // more straightforward and efficient to materialize them
-
- const CasStore::InsertResult InsertResult =
- m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash()));
+ const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer);
if (InsertResult.New)
{
- TotalNewBytes += DecompressedSize;
+ TotalNewBytes += Buffer.GetCompressedSize();
++NewAttachmentCount;
}
}
diff --git a/zenserver/compute/function.h b/zenserver/compute/function.h
index efabe96ee..650cee757 100644
--- a/zenserver/compute/function.h
+++ b/zenserver/compute/function.h
@@ -20,7 +20,6 @@
namespace zen {
-class CasStore;
class CidStore;
class UpstreamApply;
class CloudCacheClient;
@@ -35,8 +34,7 @@ struct CloudCacheClientOptions;
class HttpFunctionService : public HttpService
{
public:
- HttpFunctionService(CasStore& Store,
- CidStore& InCidStore,
+ HttpFunctionService(CidStore& InCidStore,
const CloudCacheClientOptions& ComputeOptions,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& ComputeAuthConfig,
@@ -52,7 +50,6 @@ private:
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
HttpRequestRouter m_Router;
- CasStore& m_CasStore;
CidStore& m_CidStore;
std::unique_ptr<UpstreamApply> m_UpstreamApply;
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 1853941ed..e42704ccf 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -16,8 +16,8 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
#include <zenstore/caslog.h>
+#include <zenstore/scrubcontext.h>
#include "config.h"
@@ -350,7 +350,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
Hashes.push_back(Kv.second);
}
- GcCtx.ContributeCids(Hashes);
+ GcCtx.AddRetainedCids(Hashes);
Hashes.clear();
@@ -359,7 +359,7 @@ ProjectStore::Oplog::GatherReferences(GcContext& GcCtx)
Hashes.push_back(Kv.second);
}
- GcCtx.ContributeCids(Hashes);
+ GcCtx.AddRetainedCids(Hashes);
}
bool
@@ -872,7 +872,7 @@ ProjectStore::Project::GatherReferences(GcContext& GcCtx)
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc)
+ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
: GcContributor(Gc)
, m_Log(zen::logging::Get("project"))
, m_CidStore(Store)
@@ -1482,7 +1482,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
const IoHash FileHash = Entry.AsHash();
- if (!m_CidStore.FindChunkByCid(FileHash))
+ if (!m_CidStore.ContainsChunk(FileHash))
{
ZEN_DEBUG("prep - NEED: {}", FileHash);
diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h
index 6a8730ee2..a15556cfa 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore.h
@@ -6,8 +6,6 @@
#include <zencore/uid.h>
#include <zencore/xxhash.h>
#include <zenhttp/httpserver.h>
-#include <zenstore/cas.h>
-#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
@@ -65,7 +63,7 @@ class ProjectStore : public RefCounted, public GcContributor
struct OplogStorage;
public:
- ProjectStore(CidStore& Store, std::filesystem::path BasePath, CasGc& Gc);
+ ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc);
~ProjectStore();
struct Project;
diff --git a/zenserver/testing/launch.cpp b/zenserver/testing/launch.cpp
index 1236e6adb..0e46fff94 100644
--- a/zenserver/testing/launch.cpp
+++ b/zenserver/testing/launch.cpp
@@ -6,13 +6,14 @@
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
+# include <zencore/compress.h>
# include <zencore/filesystem.h>
# include <zencore/fmtutils.h>
# include <zencore/iobuffer.h>
# include <zencore/iohash.h>
# include <zencore/logging.h>
# include <zencore/windows.h>
-# include <zenstore/cas.h>
+# include <zenstore/cidstore.h>
ZEN_THIRD_PARTY_INCLUDES_START
# include <AccCtrl.h>
@@ -322,9 +323,9 @@ SandboxedJob::SpawnJob(std::filesystem::path ExePath)
////////////////////////////////////////////////////////////////////////////////
-HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::path& SandboxBaseDir)
+HttpLaunchService::HttpLaunchService(CidStore& Store, const std::filesystem::path& SandboxBaseDir)
: m_Log(logging::Get("exec"))
-, m_CasStore(Store)
+, m_CidStore(Store)
, m_SandboxPath(SandboxBaseDir)
{
m_Router.AddPattern("job", "([[:digit:]]+)");
@@ -402,7 +403,7 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat
const IoHash FileHash = Ob["hash"sv].AsHash();
- if (!m_CasStore.FindChunk(FileHash))
+ if (!m_CidStore.FindChunkByCid(FileHash))
{
ZEN_DEBUG("NEED: {} {} {}", FileHash, Ob["file"sv].AsString(), Ob["size"sv].AsUInt64());
@@ -465,7 +466,7 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat
const IoHash FileHash = Ob["hash"sv].AsHash();
uint64_t FileSize = Ob["size"sv].AsUInt64();
- if (IoBuffer Chunk = m_CasStore.FindChunk(FileHash); !Chunk)
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(FileHash); !Chunk)
{
ZEN_DEBUG("MISSING: {} {} {}", FileHash, FileName, FileSize);
AllOk = false;
@@ -476,9 +477,18 @@ HttpLaunchService::HttpLaunchService(CasStore& Store, const std::filesystem::pat
{
std::filesystem::path FullPath = SandboxDir / FileName;
- const IoBuffer* Chunks[] = {&Chunk};
-
- zen::WriteFile(FullPath, Chunks, 1);
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ CompositeBuffer CompositeBuffer = Compressed.DecompressToComposite();
+ std::span<const SharedBuffer> Segments = CompositeBuffer.GetSegments();
+ std::vector<IoBuffer> Chunks(Segments.size());
+ std::vector<IoBuffer*> ChunkPtrs(Segments.size());
+ for (size_t Index = 0; Index < Segments.size(); ++Index)
+ {
+ Chunks[Index] = std::move(Segments[Index].AsIoBuffer());
+ ChunkPtrs[Index] = &Chunks[Index];
+ }
+
+ zen::WriteFile(FullPath, ChunkPtrs.data(), ChunkPtrs.size());
}
}
diff --git a/zenserver/testing/launch.h b/zenserver/testing/launch.h
index 6fd3e39ae..f44618bfb 100644
--- a/zenserver/testing/launch.h
+++ b/zenserver/testing/launch.h
@@ -17,7 +17,7 @@
namespace zen {
-class CasStore;
+class CidStore;
/**
* Process launcher for test executables
@@ -25,7 +25,7 @@ class CasStore;
class HttpLaunchService : public HttpService
{
public:
- HttpLaunchService(CasStore& Store, const std::filesystem::path& SandboxBaseDir);
+ HttpLaunchService(CidStore& Store, const std::filesystem::path& SandboxBaseDir);
~HttpLaunchService();
virtual const char* BaseUri() const override;
@@ -36,7 +36,7 @@ private:
spdlog::logger& m_Log;
HttpRequestRouter m_Router;
- CasStore& m_CasStore;
+ CidStore& m_CidStore;
std::filesystem::path m_SandboxPath;
std::atomic<int> m_SandboxCount{0};
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index 38c798569..22b06d9c4 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -17,7 +17,6 @@
# include <zencore/timer.h>
# include <zencore/workthreadpool.h>
-# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
# include <auth/authmgr.h>
@@ -49,11 +48,9 @@ namespace detail {
const UpstreamAuthConfig& ComputeAuthConfig,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
CidStore& CidStore,
AuthMgr& Mgr)
: m_Log(logging::Get("upstream-apply"))
- , m_CasStore(CasStore)
, m_CidStore(CidStore)
, m_AuthMgr(Mgr)
{
@@ -341,7 +338,7 @@ namespace detail {
}
else
{
- DataBuffer = m_CasStore.FindChunk(Hash);
+ DataBuffer = m_CidStore.FindChunkByCid(Hash);
if (!DataBuffer)
{
Log().warn("Put blob FAILED, input chunk '{}' missing", Hash);
@@ -676,7 +673,6 @@ namespace detail {
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- CasStore& m_CasStore;
CidStore& m_CidStore;
AuthMgr& m_AuthMgr;
std::string m_DisplayName;
@@ -1218,7 +1214,7 @@ namespace detail {
const IoHash ChunkId = FileEntry["hash"sv].AsHash();
const uint64_t Size = FileEntry["size"sv].AsUInt64();
- if (!m_CasStore.ContainsChunk(ChunkId))
+ if (!m_CidStore.ContainsChunk(ChunkId))
{
Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
return false;
@@ -1443,7 +1439,6 @@ UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& Comput
const UpstreamAuthConfig& ComputeAuthConfig,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
CidStore& CidStore,
AuthMgr& Mgr)
{
@@ -1451,11 +1446,10 @@ UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& Comput
ComputeAuthConfig,
StorageOptions,
StorageAuthConfig,
- CasStore,
CidStore,
Mgr);
}
} // namespace zen
-#endif // ZEN_WITH_COMPUTE_SERVICES \ No newline at end of file
+#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index c397bb141..c719b225d 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -11,7 +11,6 @@
# include <zencore/timer.h>
# include <zencore/workthreadpool.h>
-# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
# include "diag/logging.h"
@@ -77,10 +76,9 @@ struct UpstreamApplyStats
class UpstreamApplyImpl final : public UpstreamApply
{
public:
- UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+ UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore)
: m_Log(logging::Get("upstream-apply"))
, m_Options(Options)
- , m_CasStore(CasStore)
, m_CidStore(CidStore)
, m_Stats(Options.StatsEnabled)
, m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount)
@@ -429,7 +427,6 @@ private:
spdlog::logger& m_Log;
UpstreamApplyOptions m_Options;
- CasStore& m_CasStore;
CidStore& m_CidStore;
UpstreamApplyStats m_Stats;
UpstreamApplyTasks m_ApplyTasks;
@@ -452,9 +449,9 @@ UpstreamApply::IsHealthy() const
}
std::unique_ptr<UpstreamApply>
-UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore)
{
- return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore);
+ return std::make_unique<UpstreamApplyImpl>(Options, CidStore);
}
} // namespace zen
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
index 1deaf00a5..4a095be6c 100644
--- a/zenserver/upstream/upstreamapply.h
+++ b/zenserver/upstream/upstreamapply.h
@@ -18,7 +18,6 @@
namespace zen {
class AuthMgr;
-class CasStore;
class CbObjectWriter;
class CidStore;
class CloudCacheTokenProvider;
@@ -153,7 +152,6 @@ public:
const UpstreamAuthConfig& ComputeAuthConfig,
const CloudCacheClientOptions& StorageOptions,
const UpstreamAuthConfig& StorageAuthConfig,
- CasStore& CasStore,
CidStore& CidStore,
AuthMgr& Mgr);
};
@@ -186,7 +184,7 @@ public:
virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
- static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
+ static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CidStore& CidStore);
};
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 98b4439c7..7d1f72004 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -15,7 +15,6 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
#include <auth/authmgr.h>
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 801a98523..31ca8851a 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -17,8 +17,8 @@
#include <zenhttp/httpserver.h>
#include <zenhttp/websocket.h>
#include <zenstore/basicfile.h>
-#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
+#include <zenstore/scrubcontext.h>
#include <zenutil/zenserverprocess.h>
#if ZEN_PLATFORM_WINDOWS
@@ -56,7 +56,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
//////////////////////////////////////////////////////////////////////////
-#include "casstore.h"
#include "config.h"
#include "diag/logging.h"
@@ -103,6 +102,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "auth/authservice.h"
#include "cache/structuredcache.h"
#include "cache/structuredcachestore.h"
+#include "cidstore.h"
#include "compute/function.h"
#include "diag/diagsvcs.h"
#include "experimental/usnjournal.h"
@@ -253,17 +253,16 @@ public:
ZEN_INFO("initializing storage");
- zen::CasStoreConfiguration Config;
+ zen::CidStoreConfiguration Config;
Config.RootDirectory = m_DataRoot / "cas";
- m_CasStore->Initialize(Config);
-
- m_CidStore = std::make_unique<zen::CidStore>(*m_CasStore, m_DataRoot / "cid");
- m_CasGc.SetCidStore(m_CidStore.get());
+ m_CidStore = std::make_unique<zen::CidStore>(m_GcManager);
+ m_CidStore->Initialize(Config);
+ m_CidService.reset(new zen::HttpCidService{*m_CidStore});
ZEN_INFO("instantiating project service");
- m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_CasGc);
+ m_ProjectStore = new zen::ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager);
m_HttpProjectService.reset(new zen::HttpProjectService{*m_CidStore, m_ProjectStore});
#if ZEN_WITH_EXEC_SERVICES
@@ -274,7 +273,7 @@ public:
std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
zen::CreateDirectories(SandboxDir);
- m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
+ m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CidStore, SandboxDir);
}
else
{
@@ -328,7 +327,7 @@ public:
m_Http->RegisterService(*m_HttpProjectService);
}
- m_Http->RegisterService(m_CasService);
+ m_Http->RegisterService(*m_CidService);
#if ZEN_WITH_EXEC_SERVICES
if (ServerOptions.ExecServiceEnabled)
@@ -522,7 +521,6 @@ public:
ZEN_INFO("Storage validation STARTING");
ScrubContext Ctx;
- m_CasStore->Scrub(Ctx);
m_CidStore->Scrub(Ctx);
m_ProjectStore->Scrub(Ctx);
m_StructuredCacheService->Scrub(Ctx);
@@ -538,9 +536,6 @@ public:
void Flush()
{
- if (m_CasStore)
- m_CasStore->Flush();
-
if (m_CidStore)
m_CidStore->Flush();
@@ -602,14 +597,13 @@ private:
std::unique_ptr<zen::HttpAuthService> m_AuthService;
zen::HttpStatusService m_StatusService;
zen::HttpStatsService m_StatsService;
- zen::CasGc m_CasGc;
- zen::GcScheduler m_GcScheduler{m_CasGc};
- std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore(m_CasGc)};
+ zen::GcManager m_GcManager;
+ zen::GcScheduler m_GcScheduler{m_GcManager};
std::unique_ptr<zen::CidStore> m_CidStore;
std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
zen::HttpTestService m_TestService;
zen::HttpTestingService m_TestingService;
- zen::HttpCasService m_CasService{*m_CasStore};
+ std::unique_ptr<zen::HttpCidService> m_CidService;
zen::RefPtr<zen::ProjectStore> m_ProjectStore;
std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
std::unique_ptr<zen::UpstreamCache> m_UpstreamCache;
@@ -745,7 +739,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
ZEN_INFO("instantiating structured cache service");
m_CacheStore = std::make_unique<ZenCacheStore>(
- m_CasGc,
+ m_GcManager,
ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true});
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
@@ -873,8 +867,7 @@ ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
.OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider,
.AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken};
- m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore,
- *m_CidStore,
+ m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CidStore,
ComputeOptions,
StorageOptions,
ComputeAuthConfig,