aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
committerLiam Mitchell <[email protected]>2025-08-21 23:58:51 +0000
commit33209bd6931f49362dfc2d62c6cb6b87a42c99e1 (patch)
treecfc7914634088b3f4feac2d4cec0b5650dfdcc3c /src/zenserver
parentFix changelog merge issues (diff)
parentavoid new in static IoBuffer (#472) (diff)
downloadzen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.tar.xz
zen-33209bd6931f49362dfc2d62c6cb6b87a42c99e1.zip
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/admin/admin.cpp29
-rw-r--r--src/zenserver/admin/admin.h12
-rw-r--r--src/zenserver/buildstore/httpbuildstore.cpp22
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp420
-rw-r--r--src/zenserver/cache/httpstructuredcache.h7
-rw-r--r--src/zenserver/config.cpp13
-rw-r--r--src/zenserver/config.h1
-rw-r--r--src/zenserver/frontend/html.zipbin161002 -> 161185 bytes
-rw-r--r--src/zenserver/frontend/html/indexer/worker.js46
-rw-r--r--src/zenserver/frontend/html/pages/entry.js36
-rw-r--r--src/zenserver/main.cpp17
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.cpp618
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.h5
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp167
-rw-r--r--src/zenserver/projectstore/httpprojectstore.h7
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp4
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.h3
-rw-r--r--src/zenserver/projectstore/projectstore.cpp200
-rw-r--r--src/zenserver/projectstore/projectstore.h13
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp38
-rw-r--r--src/zenserver/upstream/upstreamcache.h3
-rw-r--r--src/zenserver/zenserver.cpp100
-rw-r--r--src/zenserver/zenserver.h28
23 files changed, 1047 insertions, 742 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp
index 73166e608..8c2e6d771 100644
--- a/src/zenserver/admin/admin.cpp
+++ b/src/zenserver/admin/admin.cpp
@@ -17,14 +17,11 @@
# include <mimalloc.h>
#endif
-#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
-#include <zenstore/buildstore/buildstore.h>
#include <zenstore/cache/structuredcachestore.h>
#include <zenutil/workerpools.h>
#include "config.h"
-#include "projectstore/projectstore.h"
#include <chrono>
@@ -104,17 +101,13 @@ GetStatsForStateDirectory(std::filesystem::path StateDir)
HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
JobQueue& BackgroundJobQueue,
ZenCacheStore* CacheStore,
- CidStore* CidStore,
- ProjectStore* ProjectStore,
- BuildStore* BuildStore,
+ std::function<void()>&& FlushFunction,
const LogPaths& LogPaths,
const ZenServerOptions& ServerOptions)
: m_GcScheduler(Scheduler)
, m_BackgroundJobQueue(BackgroundJobQueue)
, m_CacheStore(CacheStore)
-, m_CidStore(CidStore)
-, m_ProjectStore(ProjectStore)
-, m_BuildStore(BuildStore)
+, m_FlushFunction(std::move(FlushFunction))
, m_LogPaths(LogPaths)
, m_ServerOptions(ServerOptions)
{
@@ -247,6 +240,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
Obj.AddFloat("QueueTimeS", GetAgeAsSeconds(CurrentState->CreateTime, CurrentState->StartTime));
Obj.AddFloat("RunTimeS", GetAgeAsSeconds(CurrentState->StartTime, CurrentState->EndTime));
Obj.AddFloat("CompleteTimeS", GetAgeAsSeconds(CurrentState->EndTime, Now));
+ Obj.AddInteger("ReturnCode", CurrentState->ReturnCode);
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
}
break;
@@ -782,22 +776,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler,
"flush",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
- if (m_CidStore)
- {
- m_CidStore->Flush();
- }
- if (m_CacheStore)
- {
- m_CacheStore->Flush();
- }
- if (m_ProjectStore)
- {
- m_ProjectStore->Flush();
- }
- if (m_BuildStore)
- {
- m_BuildStore->Flush();
- }
+ m_FlushFunction();
HttpReq.WriteResponse(HttpResponseCode::OK);
},
HttpVerb::kPost);
diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h
index e7821dead..9a49f5120 100644
--- a/src/zenserver/admin/admin.h
+++ b/src/zenserver/admin/admin.h
@@ -4,15 +4,13 @@
#include <zencore/compactbinary.h>
#include <zenhttp/httpserver.h>
+#include <functional>
namespace zen {
class GcScheduler;
class JobQueue;
class ZenCacheStore;
-class CidStore;
-class ProjectStore;
-class BuildStore;
struct ZenServerOptions;
class HttpAdminService : public zen::HttpService
@@ -27,9 +25,7 @@ public:
HttpAdminService(GcScheduler& Scheduler,
JobQueue& BackgroundJobQueue,
ZenCacheStore* CacheStore,
- CidStore* CidStore,
- ProjectStore* ProjectStore,
- BuildStore* BuildStore,
+ std::function<void()>&& FlushFunction,
const LogPaths& LogPaths,
const ZenServerOptions& ServerOptions);
~HttpAdminService();
@@ -42,9 +38,7 @@ private:
GcScheduler& m_GcScheduler;
JobQueue& m_BackgroundJobQueue;
ZenCacheStore* m_CacheStore;
- CidStore* m_CidStore;
- ProjectStore* m_ProjectStore;
- BuildStore* m_BuildStore;
+ std::function<void()> m_FlushFunction;
LogPaths m_LogPaths;
const ZenServerOptions& m_ServerOptions;
};
diff --git a/src/zenserver/buildstore/httpbuildstore.cpp b/src/zenserver/buildstore/httpbuildstore.cpp
index bcec74ce6..2a3ce41b7 100644
--- a/src/zenserver/buildstore/httpbuildstore.cpp
+++ b/src/zenserver/buildstore/httpbuildstore.cpp
@@ -266,7 +266,7 @@ HttpBuildStoreService::PutMetadataRequest(HttpRouterRequest& Req)
BlobsArrayIt++;
MetadataArrayIt++;
}
- m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads);
+ m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads, &GetSmallWorkerPool(EWorkloadType::Burst));
return ServerRequest.WriteResponse(HttpResponseCode::OK);
}
@@ -484,7 +484,7 @@ HttpBuildStoreService::BlobsExistsRequest(HttpRouterRequest& Req)
ResponseWriter.BeginArray("metadataExists"sv);
for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists)
{
- ResponseWriter.AddBool(BlobExists.HasBody);
+ ResponseWriter.AddBool(BlobExists.HasMetadata);
if (BlobExists.HasMetadata)
{
m_BuildStoreStats.BlobExistsMetaHitCount++;
@@ -529,23 +529,11 @@ HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request)
BuildStore::StorageStats StorageStats = m_BuildStore.GetStorageStats();
Cbo << "count" << StorageStats.EntryCount;
- Cbo << "bytes" << StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes + StorageStats.MetadataByteCount;
+ Cbo << "bytes" << StorageStats.BlobBytes + StorageStats.MetadataByteCount;
Cbo.BeginObject("blobs");
{
- Cbo << "count" << (StorageStats.LargeBlobCount + StorageStats.SmallBlobCount);
- Cbo << "bytes" << (StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes);
- Cbo.BeginObject("large");
- {
- Cbo << "count" << StorageStats.LargeBlobCount;
- Cbo << "bytes" << StorageStats.LargeBlobBytes;
- }
- Cbo.EndObject(); // large
- Cbo.BeginObject("small");
- {
- Cbo << "count" << StorageStats.SmallBlobCount;
- Cbo << "bytes" << StorageStats.SmallBlobBytes;
- }
- Cbo.EndObject(); // small
+ Cbo << "count" << StorageStats.BlobCount;
+ Cbo << "bytes" << StorageStats.BlobBytes;
}
Cbo.EndObject(); // blobs
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index bb0c55618..19ac3a216 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -85,7 +85,7 @@ namespace {
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& GetCidStore,
HttpStatsService& StatsService,
HttpStatusService& StatusService,
UpstreamCache& UpstreamCache,
@@ -95,11 +95,10 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach
, m_CacheStore(InCacheStore)
, m_StatsService(StatsService)
, m_StatusService(StatusService)
-, m_CidStore(InCidStore)
, m_UpstreamCache(UpstreamCache)
, m_DiskWriteBlocker(InDiskWriteBlocker)
, m_OpenProcessCache(InOpenProcessCache)
-, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker)
+, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, std::move(GetCidStore), InDiskWriteBlocker)
{
m_StatsService.RegisterHandler("z$", *this);
m_StatusService.RegisterHandler("z$", *this);
@@ -131,24 +130,6 @@ HttpStructuredCacheService::Flush()
}
void
-HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx)
-{
- if (m_LastScrubTime == Ctx.ScrubTimestamp())
- {
- return;
- }
-
- ZenCacheStore::Info Info = m_CacheStore.GetInfo();
-
- ZEN_INFO("scrubbing '{}'", Info.BasePath);
-
- m_LastScrubTime = Ctx.ScrubTimestamp();
-
- m_CidStore.ScrubStorage(Ctx);
- m_CacheStore.ScrubStorage(Ctx);
-}
-
-void
HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
@@ -243,6 +224,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
for (const auto& NamespaceIt : ValueDetails.Namespaces)
{
const std::string& Namespace = NamespaceIt.first;
+
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace);
+
for (const auto& BucketIt : NamespaceIt.second.Buckets)
{
const std::string& Bucket = BucketIt.first;
@@ -252,7 +236,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
{
for (const IoHash& Hash : ValueIt.second.Attachments)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
CSVWriter << "\r\n"
<< Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString()
<< ", " << gsl::narrow<uint64_t>(Payload.GetSize());
@@ -270,7 +254,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
size_t AttachmentsSize = 0;
for (const IoHash& Hash : ValueIt.second.Attachments)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
AttachmentsSize += Payload.GetSize();
}
CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize);
@@ -292,6 +276,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
for (const auto& NamespaceIt : ValueDetails.Namespaces)
{
const std::string& Namespace = NamespaceIt.first;
+
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace);
+
Cbo.BeginObject();
{
Cbo.AddString("name", Namespace);
@@ -334,7 +321,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
{
Cbo.BeginObject();
Cbo.AddHash("cid", Hash);
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize()));
Cbo.EndObject();
}
@@ -348,7 +335,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request)
size_t AttachmentsSize = 0;
for (const IoHash& Hash : ValueIt.second.Attachments)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Hash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(Hash);
AttachmentsSize += Payload.GetSize();
}
Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize));
@@ -623,6 +610,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName);
+
if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty())
{
ResponseWriter.BeginObject("BucketSizes");
@@ -681,7 +670,7 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque
uint64_t AttachmentsSize = 0;
- m_CidStore.IterateChunks(
+ ChunkStore.IterateChunks(
AllAttachments,
[&](size_t Index, const IoBuffer& Payload) {
ZEN_UNUSED(Index);
@@ -749,6 +738,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName);
+
if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true")
{
CacheContentStats ContentStats;
@@ -775,7 +766,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background);
- m_CidStore.IterateChunks(
+ ChunkStore.IterateChunks(
ContentStats.Attachments,
[&](size_t Index, const IoBuffer& Payload) {
ZEN_UNUSED(Index);
@@ -850,6 +841,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()};
Stopwatch Timer;
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) &&
m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue))
{
@@ -864,17 +857,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
uint32_t MissingCount = 0;
CbObjectView CacheRecord(ClientResultValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
+ CacheRecord.IterateAttachments([this, &ChunkStore, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
if (SkipData)
{
- if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ if (!ChunkStore.ContainsChunk(AttachmentHash.AsHash()))
{
MissingCount++;
}
}
else
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ if (IoBuffer Chunk = ChunkStore.FindChunkByCid(AttachmentHash.AsHash()))
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
if (Compressed)
@@ -974,6 +967,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
{
Success = true;
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
ClientResultValue.Value = UpstreamResult.Value;
ClientResultValue.Value.SetContentType(AcceptType);
@@ -997,8 +992,14 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (Success && StoreLocal)
{
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr);
- m_CacheStats.WriteCount++;
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore
+ .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
else if (AcceptType == ZenContentType::kCbPackage)
@@ -1018,6 +1019,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
CacheRecord.IterateAttachments([this,
&Package,
&Ref,
+ &ChunkStore,
&WriteAttachmentBuffers,
&WriteRawHashes,
&ReferencedAttachments,
@@ -1052,12 +1054,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
{
if (SkipData)
{
- if (m_CidStore.ContainsChunk(Hash))
+ if (ChunkStore.ContainsChunk(Hash))
{
Count.Valid++;
}
}
- else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
+ else if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Hash))
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk));
if (Compressed)
@@ -1083,30 +1085,35 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (StoreLocal)
{
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue,
- ReferencedAttachments,
- nullptr);
- m_CacheStats.WriteCount++;
-
- if (!WriteAttachmentBuffers.empty())
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
- std::vector<CidStore::InsertResult> InsertResults =
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
- for (const CidStore::InsertResult& Result : InsertResults)
+ m_CacheStats.WriteCount++;
+
+ if (!WriteAttachmentBuffers.empty())
{
- if (Result.New)
+ std::vector<CidStore::InsertResult> InsertResults =
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ for (const CidStore::InsertResult& Result : InsertResults)
{
- Count.New++;
+ if (Result.New)
+ {
+ Count.New++;
+ }
}
}
- }
- WriteAttachmentBuffers = {};
- WriteRawHashes = {};
+ WriteAttachmentBuffers = {};
+ WriteRawHashes = {};
+ }
}
BinaryWriter MemStream;
@@ -1197,6 +1204,24 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
}
+ auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) {
+ ZEN_UNUSED(PutResult);
+
+ HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError;
+ switch (PutResult.Status)
+ {
+ case zen::PutStatus::Conflict:
+ ResponseCode = HttpResponseCode::Conflict;
+ break;
+ case zen::PutStatus::Invalid:
+ ResponseCode = HttpResponseCode::BadRequest;
+ break;
+ }
+
+ return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode)
+ : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message);
+ };
+
const HttpContentType ContentType = Request.RequestContentType();
Body.SetContentType(ContentType);
@@ -1225,18 +1250,28 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
{
RawHash = IoHash::HashBuffer(SharedBuffer(Body));
}
- m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
- {},
- nullptr);
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}",
@@ -1270,17 +1305,34 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
std::vector<IoHash> ReferencedAttachments;
int32_t TotalCount = 0;
- CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) {
- const IoHash Hash = AttachmentHash.AsHash();
- ReferencedAttachments.push_back(Hash);
- if (m_CidStore.ContainsChunk(Hash))
- {
- ValidAttachments.emplace_back(Hash);
- }
- TotalCount++;
- });
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr);
+ CacheRecord.IterateAttachments(
+ [this, &ChunkStore, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) {
+ const IoHash Hash = AttachmentHash.AsHash();
+ ReferencedAttachments.push_back(Hash);
+ if (ChunkStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ }
+ TotalCount++;
+ });
+
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}",
@@ -1298,10 +1350,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
CachePolicy Policy = PolicyFromUrl;
if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbObject,
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -1334,38 +1388,46 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
WriteAttachmentBuffers.reserve(NumAttachments);
WriteRawHashes.reserve(NumAttachments);
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count](
- CbFieldView HashView) {
- const IoHash Hash = HashView.AsHash();
- ReferencedAttachments.push_back(Hash);
- if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
- {
- if (Attachment->IsCompressedBinary())
- {
- WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
- WriteRawHashes.push_back(Hash);
- ValidAttachments.emplace_back(Hash);
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(HttpContentType::kCbPackage),
- Hash);
- Count.Invalid++;
- }
- }
- else if (m_CidStore.ContainsChunk(Hash))
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
+ CacheRecord.IterateAttachments([this,
+ &Ref,
+ &Package,
+ &ChunkStore,
+ &WriteAttachmentBuffers,
+ &WriteRawHashes,
+ &ValidAttachments,
+ &ReferencedAttachments,
+ &Count](CbFieldView HashView) {
+ const IoHash Hash = HashView.AsHash();
+ ReferencedAttachments.push_back(Hash);
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
+ {
+ if (Attachment->IsCompressedBinary())
{
+ WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer());
+ WriteRawHashes.push_back(Hash);
ValidAttachments.emplace_back(Hash);
Count.Valid++;
}
- Count.Total++;
- });
+ else
+ {
+ ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ToString(HttpContentType::kCbPackage),
+ Hash);
+ Count.Invalid++;
+ }
+ }
+ else if (ChunkStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
if (Count.Invalid > 0)
{
@@ -1373,15 +1435,23 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
+ const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+
ZenCacheValue CacheValue;
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return WriteFailureResponse(PutResult);
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
{
- std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (const CidStore::InsertResult& InsertResult : InsertResults)
{
if (InsertResult.New)
@@ -1408,10 +1478,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Namespace = Ref.Namespace,
- .Key = {Ref.BucketSegment, Ref.HashKey},
- .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage,
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)},
+ [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); });
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -1445,7 +1517,9 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
Stopwatch Timer;
- IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
+ IoBuffer Value = ChunkStore.FindChunkByCid(Ref.ValueContentId);
const UpstreamEndpointInfo* Source = nullptr;
CachePolicy Policy = PolicyFromUrl;
@@ -1467,7 +1541,7 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons
{
if (AreDiskWritesAllowed())
{
- m_CidStore.AddChunk(UpstreamResult.Value, RawHash);
+ ChunkStore.AddChunk(UpstreamResult.Value, RawHash);
}
Source = UpstreamResult.Source;
}
@@ -1561,7 +1635,9 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons
"ValueContentId does not match attachment hash"sv);
}
- CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash);
+ CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace);
+
+ CidStore::InsertResult Result = ChunkStore.AddChunk(Body, RawHash);
ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}",
Ref.Namespace,
@@ -1776,16 +1852,51 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
EmitSnapshot("requests", m_HttpRequests, Cbo);
- const uint64_t HitCount = m_CacheStats.HitCount;
- const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
- const uint64_t MissCount = m_CacheStats.MissCount;
- const uint64_t WriteCount = m_CacheStats.WriteCount;
- const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
- struct CidStoreStats StoreStats = m_CidStore.Stats();
- const uint64_t ChunkHitCount = StoreStats.HitCount;
- const uint64_t ChunkMissCount = StoreStats.MissCount;
- const uint64_t ChunkWriteCount = StoreStats.WriteCount;
- const uint64_t TotalCount = HitCount + MissCount;
+ const uint64_t HitCount = m_CacheStats.HitCount;
+ const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount;
+ const uint64_t MissCount = m_CacheStats.MissCount;
+ const uint64_t WriteCount = m_CacheStats.WriteCount;
+ const uint64_t BadRequestCount = m_CacheStats.BadRequestCount;
+
+ uint64_t TotalChunkHitCount = 0;
+ uint64_t TotalChunkMissCount = 0;
+ uint64_t TotalChunkWriteCount = 0;
+ CidStoreSize TotalCidSize;
+
+ tsl::robin_map<CidStore*, std::string> UniqueStores;
+ {
+ std::vector<std::string> NamespaceNames = m_CacheStore.GetNamespaces();
+
+ for (const std::string& NamespaceName : NamespaceNames)
+ {
+ CidStore* Store = &m_RpcHandler.GetCidStore(NamespaceName);
+ if (auto It = UniqueStores.find(Store); It == UniqueStores.end())
+ {
+ UniqueStores.insert_or_assign(Store, NamespaceName);
+ }
+ else
+ {
+ UniqueStores.insert_or_assign(Store, std::string{});
+ }
+ }
+
+ for (auto It : UniqueStores)
+ {
+ CidStore* ChunkStore = It.first;
+
+ CidStoreStats StoreStats = ChunkStore->Stats();
+ CidStoreSize StoreSize = ChunkStore->TotalSize();
+
+ TotalChunkHitCount += StoreStats.HitCount;
+ TotalChunkMissCount += StoreStats.MissCount;
+ TotalChunkWriteCount += StoreStats.WriteCount;
+
+ TotalCidSize.TinySize += StoreSize.TinySize;
+ TotalCidSize.SmallSize += StoreSize.SmallSize;
+ TotalCidSize.LargeSize += StoreSize.LargeSize;
+ TotalCidSize.TotalSize += StoreSize.TotalSize;
+ }
+ }
const uint64_t RpcRequests = m_CacheStats.RpcRequests;
const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests;
@@ -1795,17 +1906,11 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests;
const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests;
- const CidStoreSize CidSize = m_CidStore.TotalSize();
- const GcStorageSize CacheSize = m_CacheStore.StorageSize();
+ const CacheStoreSize CacheSize = m_CacheStore.TotalSize();
bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true";
bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true";
- CidStoreStats CidStoreStats = {};
- if (ShowCidStoreStats)
- {
- CidStoreStats = m_CidStore.Stats();
- }
ZenCacheStore::CacheStoreStats CacheStoreStats = {};
if (ShowCacheStoreStats)
{
@@ -1840,6 +1945,7 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
Cbo.EndObject();
Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount;
+ const uint64_t TotalCount = HitCount + MissCount;
Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0);
if (m_UpstreamCache.IsActive())
@@ -1850,7 +1956,9 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0);
}
- Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount;
+ Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount;
+ const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount;
+ Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0);
if (ShowCacheStoreStats)
{
@@ -1959,20 +2067,58 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request)
{
Cbo.BeginObject("size");
{
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
+ Cbo << "tiny" << TotalCidSize.TinySize;
+ Cbo << "small" << TotalCidSize.SmallSize;
+ Cbo << "large" << TotalCidSize.LargeSize;
+ Cbo << "total" << TotalCidSize.TotalSize;
}
Cbo.EndObject();
if (ShowCidStoreStats)
{
Cbo.BeginObject("store");
- Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount;
- EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo);
- EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo);
- // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo);
+
+ auto OutputStats = [&](CidStore& ChunkStore) {
+ CidStoreStats StoreStats = ChunkStore.Stats();
+ Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount;
+ const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount;
+ Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0);
+ EmitSnapshot("read", StoreStats.FindChunkOps, Cbo);
+ EmitSnapshot("write", StoreStats.AddChunkOps, Cbo);
+ };
+
+ if (UniqueStores.size() > 1)
+ {
+ Cbo.BeginArray("namespaces");
+ for (auto It : UniqueStores)
+ {
+ CidStore* ChunkStore = It.first;
+ const std::string& Namespace = It.second;
+ CidStoreSize ChunkStoreSize = ChunkStore->TotalSize();
+ Cbo.BeginObject();
+ {
+ Cbo << "namespace" << Namespace;
+ Cbo.BeginObject("stats");
+ OutputStats(*ChunkStore);
+ Cbo.EndObject();
+
+ Cbo.BeginObject("size");
+ {
+ Cbo << "tiny" << ChunkStoreSize.TinySize;
+ Cbo << "small" << ChunkStoreSize.SmallSize;
+ Cbo << "large" << ChunkStoreSize.LargeSize;
+ Cbo << "total" << ChunkStoreSize.TotalSize;
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray(); // namespaces
+ }
+ else if (UniqueStores.size() != 0)
+ {
+ OutputStats(*UniqueStores.begin()->first);
+ }
Cbo.EndObject();
}
}
diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h
index 13c1d6475..d46ca145d 100644
--- a/src/zenserver/cache/httpstructuredcache.h
+++ b/src/zenserver/cache/httpstructuredcache.h
@@ -70,8 +70,10 @@ namespace cache {
class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider
{
public:
+ typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc;
+
HttpStructuredCacheService(ZenCacheStore& InCacheStore,
- CidStore& InCidStore,
+ GetCidStoreFunc&& GetCidStore,
HttpStatsService& StatsService,
HttpStatusService& StatusService,
UpstreamCache& UpstreamCache,
@@ -83,7 +85,6 @@ public:
virtual void HandleRequest(HttpServerRequest& Request) override;
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
private:
struct CacheRef
@@ -116,9 +117,7 @@ private:
ZenCacheStore& m_CacheStore;
HttpStatsService& m_StatsService;
HttpStatusService& m_StatusService;
- CidStore& m_CidStore;
UpstreamCache& m_UpstreamCache;
- uint64_t m_LastScrubTime = 0;
metrics::OperationTiming m_HttpRequests;
metrics::OperationTiming m_UpstreamGetRequestTiming;
CacheStats m_CacheStats;
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 1f9ae5fb6..23bb3ad78 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -342,6 +342,7 @@ public:
Writer.WriteValue("payloadalignment", fmt::format("{}", BucketConfig.PayloadAlignment));
Writer.WriteValue("largeobjectthreshold", fmt::format("{}", BucketConfig.PayloadAlignment));
+ Writer.WriteValue("limitoverwrites", fmt::format("{}", BucketConfig.LimitOverwrites));
}
Writer.EndContainer();
}
@@ -397,6 +398,8 @@ public:
}
BucketConfig.LargeObjectThreshold = LargeObjectThreshold;
+ BucketConfig.LimitOverwrites = Bucket.value().get_or("limitoverwrites", BucketConfig.LimitOverwrites);
+
Value.push_back(std::make_pair(std::move(Name), BucketConfig));
}
}
@@ -542,6 +545,9 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("cache.bucket.largeobjectthreshold"sv,
ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold,
"cache-bucket-largeobjectthreshold"sv);
+ LuaOptions.AddOption("cache.bucket.limitoverwrites"sv,
+ ServerOptions.StructuredCacheConfig.BucketConfig.LimitOverwrites,
+ "cache-bucket-limit-overwrites"sv);
////// cache.upstream
LuaOptions.AddOption("cache.upstream.policy"sv, ServerOptions.UpstreamCacheConfig.CachePolicy, "upstream-cache-policy"sv);
@@ -1120,6 +1126,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold)->default_value("1024"),
"");
+ options.add_option("cache",
+ "",
+ "cache-bucket-limit-overwrites",
+ "Whether to require policy flag pattern before allowing overwrites in cache bucket",
+ cxxopts::value<bool>(ServerOptions.StructuredCacheConfig.BucketConfig.LimitOverwrites)->default_value("false"),
+ "");
+
options.add_option("gc",
"",
"gc-cache-attachment-store",
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 9753e3ae2..4a022a807 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -125,6 +125,7 @@ struct ZenStructuredCacheBucketConfig
uint32_t PayloadAlignment = 1u << 4;
uint64_t MemCacheSizeThreshold = 1 * 1024;
uint64_t LargeObjectThreshold = 128 * 1024;
+ bool LimitOverwrites = false;
};
struct ZenStructuredCacheConfig
diff --git a/src/zenserver/frontend/html.zip b/src/zenserver/frontend/html.zip
index 5778fa3d2..1de4c3d74 100644
--- a/src/zenserver/frontend/html.zip
+++ b/src/zenserver/frontend/html.zip
Binary files differ
diff --git a/src/zenserver/frontend/html/indexer/worker.js b/src/zenserver/frontend/html/indexer/worker.js
index 69ee234fa..c0cbb7e11 100644
--- a/src/zenserver/frontend/html/indexer/worker.js
+++ b/src/zenserver/frontend/html/indexer/worker.js
@@ -73,33 +73,38 @@ async function map_id_to_key(project_id, oplog, start, end, page_size, stride)
else if (field.is_named("packagedata")) pkg_data = field;
else if (field.is_named("bulkdata")) bulk_data = field;
}
- if (key == undefined || pkg_data == undefined)
+
+ if (key == undefined)
continue;
var id = 0n;
var size = 0n;
var raw_size = 0n;
- for (const item of pkg_data.as_array())
- {
- var found = 0, pkg_id = undefined;
- for (const field of item.as_object())
+
+ if (pkg_data)
+ {
+ for (const item of pkg_data.as_array())
{
- if (!id && field.is_named("id")) pkg_id = field.as_value();
- else if (field.is_named("size")) size += field.as_value();
- else if (field.is_named("rawsize")) raw_size += field.as_value();
- else continue;
- if (found++ >= 3)
- break;
- }
+ var found = 0, pkg_id = undefined;
+ for (const field of item.as_object())
+ {
+ if (!id && field.is_named("id")) pkg_id = field.as_value();
+ else if (field.is_named("size")) size += field.as_value();
+ else if (field.is_named("rawsize")) raw_size += field.as_value();
+ else continue;
+ if (found++ >= 3)
+ break;
+ }
- if (pkg_id === undefined)
- continue;
+ if (pkg_id === undefined)
+ continue;
- pkg_id = pkg_id.subarray(0, 8);
- for (var i = 7; i >= 0; --i)
- {
- id <<= 8n;
- id |= BigInt(pkg_id[i]);
+ pkg_id = pkg_id.subarray(0, 8);
+ for (var i = 7; i >= 0; --i)
+ {
+ id <<= 8n;
+ id |= BigInt(pkg_id[i]);
+ }
}
}
@@ -119,9 +124,6 @@ async function map_id_to_key(project_id, oplog, start, end, page_size, stride)
}
}
- if (id == 0)
- continue;
-
result[count] = [id, key.as_value(), size, raw_size];
count++;
}
diff --git a/src/zenserver/frontend/html/pages/entry.js b/src/zenserver/frontend/html/pages/entry.js
index 54fb11c18..08589b090 100644
--- a/src/zenserver/frontend/html/pages/entry.js
+++ b/src/zenserver/frontend/html/pages/entry.js
@@ -239,30 +239,34 @@ export class Page extends ZenPage
_convert_legacy_to_tree(entry)
{
- const pkg_data = entry.find("packagedata");
- if (pkg_data == undefined)
- return
+ const raw_pkgst_entry = entry.find("packagestoreentry");
+ if (raw_pkgst_entry == undefined) //if there is no packagestorentry then don't show the fancy webpage, just show the raw json
+ return;
const tree = {};
- var id = 0n;
- for (var item of pkg_data.as_array())
+ const pkg_data = entry.find("packagedata");
+ if (pkg_data)
{
- var pkg_id = item.as_object().find("id");
- if (pkg_id == undefined)
- continue;
-
- pkg_id = pkg_id.as_value().subarray(0, 8);
- for (var i = 7; i >= 0; --i)
+ var id = 0n;
+ for (var item of pkg_data.as_array())
{
- id <<= 8n;
- id |= BigInt(pkg_id[i]);
+ var pkg_id = item.as_object().find("id");
+ if (pkg_id == undefined)
+ continue;
+
+ pkg_id = pkg_id.as_value().subarray(0, 8);
+ for (var i = 7; i >= 0; --i)
+ {
+ id <<= 8n;
+ id |= BigInt(pkg_id[i]);
+ }
+ break;
}
- break;
+ tree["$id"] = id;
}
- tree["$id"] = id;
- const pkgst_entry = entry.find("packagestoreentry").as_object();
+ const pkgst_entry = raw_pkgst_entry.as_object();
for (const field of pkgst_entry)
{
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index 76e022017..4ea4ee87e 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -167,6 +167,14 @@ ZenEntryPoint::Run()
ZEN_INFO("ServerState.Sweep()");
ServerState.Sweep();
+ auto NotifyReady = [&] {
+ if (!m_ServerOptions.ChildId.empty())
+ {
+ NamedEvent ParentEvent{m_ServerOptions.ChildId};
+ ParentEvent.Set();
+ }
+ };
+
uint32_t AttachSponsorProcessRetriesLeft = 3;
ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(m_ServerOptions.BasePort);
while (Entry)
@@ -205,6 +213,7 @@ ZenEntryPoint::Run()
// Sponsor processes are checked every second, so 2 second wait time should be enough
if (Entry->AddSponsorProcess(m_ServerOptions.OwnerPid, 2000))
{
+ NotifyReady();
std::exit(0);
}
if (AttachSponsorProcessRetriesLeft-- > 0)
@@ -350,14 +359,8 @@ ZenEntryPoint::Run()
Server.SetIsReadyFunc([&] {
m_LockFile.Update(MakeLockData(true), Ec);
-
- if (!m_ServerOptions.ChildId.empty())
- {
- NamedEvent ParentEvent{m_ServerOptions.ChildId};
- ParentEvent.Set();
- }
-
ReportServiceStatus(ServiceStatus::Running);
+ NotifyReady();
});
Server.Run();
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
index ab96ae92d..52cdc5983 100644
--- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
@@ -3,14 +3,12 @@
#include "buildsremoteprojectstore.h"
#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
#include <zenhttp/httpclientauth.h>
-
-#include <zenutil/jupiter/jupiterclient.h>
-#include <zenutil/jupiter/jupitersession.h>
+#include <zenutil/jupiter/jupiterbuildstorage.h>
namespace zen {
@@ -21,20 +19,24 @@ static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
class BuildsRemoteStore : public RemoteProjectStore
{
public:
- BuildsRemoteStore(Ref<JupiterClient>&& InJupiterClient,
- std::string_view Namespace,
- std::string_view Bucket,
- const Oid& BuildId,
- const IoBuffer& MetaData,
- bool ForceDisableBlocks,
- bool ForceDisableTempBlocks,
- const std::filesystem::path& TempFilePath)
- : m_JupiterClient(std::move(InJupiterClient))
+ BuildsRemoteStore(std::unique_ptr<BuildStorage::Statistics>&& BuildStorageStats,
+ std::unique_ptr<HttpClient>&& BuildStorageHttp,
+ std::unique_ptr<BuildStorage>&& BuildStorage,
+ std::string_view Url,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const Oid& BuildId,
+ const IoBuffer& MetaData,
+ bool ForceDisableBlocks,
+ bool ForceDisableTempBlocks)
+ : m_BuildStorageStats(std::move(BuildStorageStats))
+ , m_BuildStorageHttp(std::move(BuildStorageHttp))
+ , m_BuildStorage(std::move(BuildStorage))
+ , m_Url(Url)
, m_Namespace(Namespace)
, m_Bucket(Bucket)
, m_BuildId(BuildId)
, m_MetaData(MetaData)
- , m_TempFilePath(TempFilePath)
, m_EnableBlocks(!ForceDisableBlocks)
, m_UseTempBlocks(!ForceDisableTempBlocks)
{
@@ -47,63 +49,91 @@ public:
.UseTempBlockFiles = m_UseTempBlocks,
.AllowChunking = true,
.ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId),
- .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)};
+ .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_Url, m_Namespace, m_Bucket, m_BuildId)};
}
virtual Stats GetStats() const override
{
- return {.m_SentBytes = m_SentBytes.load(),
- .m_ReceivedBytes = m_ReceivedBytes.load(),
- .m_RequestTimeNS = m_RequestTimeNS.load(),
- .m_RequestCount = m_RequestCount.load(),
- .m_PeakSentBytes = m_PeakSentBytes.load(),
- .m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
- .m_PeakBytesPerSec = m_PeakBytesPerSec.load()};
+ return {
+ .m_SentBytes = m_BuildStorageStats->TotalBytesWritten.load(),
+ .m_ReceivedBytes = m_BuildStorageStats->TotalBytesRead.load(),
+ .m_RequestTimeNS = m_BuildStorageStats->TotalRequestTimeUs.load() * 1000,
+ .m_RequestCount = m_BuildStorageStats->TotalRequestCount.load(),
+ .m_PeakSentBytes = m_BuildStorageStats->PeakSentBytes.load(),
+ .m_PeakReceivedBytes = m_BuildStorageStats->PeakReceivedBytes.load(),
+ .m_PeakBytesPerSec = m_BuildStorageStats->PeakBytesPerSec.load(),
+ };
}
virtual CreateContainerResult CreateContainer() override
{
ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
-
- IoBuffer Payload = m_MetaData;
- Payload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload);
- AddStats(PutResult);
+ CreateContainerResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
- CreateContainerResult Result{ConvertResult(PutResult)};
- if (Result.ErrorCode)
+ CbObject Payload = LoadCompactBinaryObject(m_MetaData);
+ try
{
- Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Result.Reason);
+ CbObject PutBuildResult = m_BuildStorage->PutBuild(m_BuildId, Payload);
+ ZEN_UNUSED(PutBuildResult);
+ m_OplogBuildPartId = Oid::NewOid();
}
- m_OplogBuildPartId = Oid::NewOid();
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason =
+ fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason =
+ fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
+ }
+
return Result;
}
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- PutBuildPartResult PutResult =
- Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload);
- AddStats(PutResult);
- SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
- if (Result.ErrorCode)
+ SaveResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
{
- Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Result.Reason);
+ CbObject ObjectPayload = LoadCompactBinaryObject(Payload);
+
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ m_BuildStorage->PutBuildPart(m_BuildId, m_OplogBuildPartId, OplogContainerPartName, ObjectPayload);
+ Result.RawHash = PutBuildPartResult.first;
+ Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(PutBuildPartResult.second.begin(), PutBuildPartResult.second.end());
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
}
return Result;
@@ -114,52 +144,84 @@ public:
ChunkBlockDescription&& Block) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult PutResult =
- Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
- AddStats(PutResult);
+ SaveAttachmentResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
- SaveAttachmentResult Result{ConvertResult(PutResult)};
- if (Result.ErrorCode)
+ try
{
- Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- RawHash,
- Result.Reason);
- return Result;
- }
+ m_BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
- if (Block.BlockHash == RawHash)
- {
- CbObjectWriter BlockMetaData;
- BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string());
-
- IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer();
- MetaPayload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload);
- AddStats(PutMetaResult);
- RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult);
- if (MetaDataResult.ErrorCode)
+ if (Block.BlockHash == RawHash)
{
- ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- RawHash,
- MetaDataResult.Reason);
+ try
+ {
+ CbObjectWriter BlockMetaData;
+ BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string());
+ CbObject MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save());
+ if (!m_BuildStorage->PutBlockMetadata(m_BuildId, RawHash, MetaPayload))
+ {
+ ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ RawHash,
+ "not found");
+ }
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
}
}
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+
return Result;
}
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
{
SaveAttachmentsResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
for (const SharedBuffer& Chunk : Chunks)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
@@ -177,38 +239,68 @@ public:
ZEN_UNUSED(RawHash);
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- FinalizeBuildPartResult FinalizeRefResult =
- Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash);
- AddStats(FinalizeRefResult);
+ FinalizeResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
- FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
- if (Result.ErrorCode)
+ try
+ {
+ std::vector<IoHash> Needs = m_BuildStorage->FinalizeBuildPart(m_BuildId, m_OplogBuildPartId, RawHash);
+ Result.Needs = std::unordered_set<IoHash, IoHash::Hasher>(Needs.begin(), Needs.end());
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0;
+ Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
{
- Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId,
- Result.Reason);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId,
+ Ex.what());
}
- else if (Result.Needs.empty())
+
+ if (!Result.ErrorCode && Result.Needs.empty())
{
- JupiterResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId);
- AddStats(FinalizeBuildResult);
- FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds;
- Result = {ConvertResult(FinalizeBuildResult)};
- if (Result.ErrorCode)
+ try
{
- Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- FinalizeBuildResult.Reason);
+ m_BuildStorage->FinalizeBuild(m_BuildId);
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = Ex.m_Error != 0 ? Ex.m_Error
+ : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode
+ : 0;
+ Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ Ex.what());
}
}
+
return Result;
}
@@ -216,161 +308,128 @@ public:
{
ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId);
- AddStats(GetBuildResult);
- LoadContainerResult Result{ConvertResult(GetBuildResult)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Result.Reason);
- return Result;
- }
- CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response);
- if (!BuildObject)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
- }
- CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
- if (!PartsObject)
+ LoadContainerResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
+ CbObject BuildObject = m_BuildStorage->GetBuild(m_BuildId);
+
+ CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
+ if (!PartsObject)
+ {
+ throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv,
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId));
+ }
+ m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
+ if (m_OplogBuildPartId == Oid::Zero)
+ {
+ throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
+ m_Url,
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ OplogContainerPartName));
+ }
+
+ Result.ContainerObject = m_BuildStorage->GetBuildPart(m_BuildId, m_OplogBuildPartId);
}
- m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
- if (m_OplogBuildPartId == Oid::Zero)
+ catch (const HttpClientError& Ex)
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
- m_JupiterClient->ServiceUrl(),
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
m_Namespace,
m_Bucket,
m_BuildId,
- OplogContainerPartName);
- return Result;
+ Ex.what());
}
-
- JupiterResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId);
- AddStats(GetBuildPartResult);
- Result = {ConvertResult(GetBuildResult)};
- Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds;
- if (Result.ErrorCode)
+ catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
+ Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'",
+ m_Url,
m_Namespace,
m_Bucket,
m_BuildId,
- m_OplogBuildPartId,
- Result.Reason);
- return Result;
+ Ex.what());
}
- CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response);
- if (!ContainerObject)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- m_OplogBuildPartId);
- return Result;
- }
- Result.ContainerObject = std::move(ContainerObject);
return Result;
}
virtual GetKnownBlocksResult GetKnownBlocks() override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, (uint64_t)-1);
- AddStats(FindResult);
- GetKnownBlocksResult Result{ConvertResult(FindResult)};
- if (Result.ErrorCode)
+
+ GetKnownBlocksResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Result.Reason);
- return Result;
+ CbObject KnownBlocks = m_BuildStorage->FindBlocks(m_BuildId, 10000u);
+ std::optional<std::vector<ChunkBlockDescription>> Blocks = ParseChunkBlockDescriptionList(KnownBlocks);
+ Result.Blocks.reserve(Blocks.value().size());
+ for (ChunkBlockDescription& BlockDescription : Blocks.value())
+ {
+ Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash,
+ .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)});
+ }
}
- if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None)
+ catch (const HttpClientError& Ex)
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
}
- std::optional<std::vector<ChunkBlockDescription>> Blocks =
- ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response));
- if (!Blocks)
+ catch (const std::exception& Ex)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv,
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId);
- return Result;
- }
- Result.Blocks.reserve(Blocks.value().size());
- for (ChunkBlockDescription& BlockDescription : Blocks.value())
- {
- Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash,
- .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)});
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
}
+
return Result;
}
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath);
- AddStats(GetResult);
- LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
- if (GetResult.ErrorCode)
+ LoadAttachmentResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
+
+ try
+ {
+ Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash);
+ }
+ catch (const HttpClientError& Ex)
+ {
+ Result.ErrorCode = MakeErrorCode(Ex);
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
+ }
+ catch (const std::exception& Ex)
{
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- RawHash,
- Result.Reason);
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason =
+ fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_Url, m_Namespace, m_Bucket, m_BuildId, Ex.what());
}
+
return Result;
}
virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
LoadAttachmentsResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() * 1000000.0; });
for (const IoHash& Hash : RawHashes)
{
LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
@@ -386,81 +445,27 @@ public:
}
private:
- void AddStats(const JupiterResult& Result)
- {
- m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
- m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
- m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
- SetAtomicMax(m_PeakSentBytes, Result.SentBytes);
- SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes);
- if (Result.ElapsedSeconds > 0.0)
- {
- uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
- SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
- }
-
- m_RequestCount.fetch_add(1);
- }
-
- static Result ConvertResult(const JupiterResult& Response)
+ static int MakeErrorCode(const HttpClientError& Ex)
{
- std::string Text;
- int32_t ErrorCode = 0;
- if (Response.ErrorCode != 0 || !Response.Success)
- {
- if (Response.Response)
- {
- HttpContentType ContentType = Response.Response.GetContentType();
- if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON)
- {
- ExtendableStringBuilder<256> SB;
- SB.Append("\n");
- SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()),
- Response.Response.GetSize()));
- Text = SB.ToString();
- }
- else if (ContentType == ZenContentType::kCbObject)
- {
- ExtendableStringBuilder<256> SB;
- SB.Append("\n");
- CompactBinaryToJson(Response.Response.GetView(), SB);
- Text = SB.ToString();
- }
- }
- }
- if (Response.ErrorCode != 0)
- {
- ErrorCode = Response.ErrorCode;
- }
- else if (!Response.Success)
- {
- ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- }
- return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
+ return Ex.m_Error != 0 ? Ex.m_Error : Ex.m_ResponseCode != HttpResponseCode::ImATeapot ? (int)Ex.m_ResponseCode : 0;
}
- Ref<JupiterClient> m_JupiterClient;
- const std::string m_Namespace;
- const std::string m_Bucket;
- const Oid m_BuildId;
- IoBuffer m_MetaData;
- Oid m_OplogBuildPartId = Oid::Zero;
- std::filesystem::path m_TempFilePath;
- const bool m_EnableBlocks = true;
- const bool m_UseTempBlocks = true;
- const bool m_AllowRedirect = false;
-
- std::atomic_uint64_t m_SentBytes = {};
- std::atomic_uint64_t m_ReceivedBytes = {};
- std::atomic_uint64_t m_RequestTimeNS = {};
- std::atomic_uint64_t m_RequestCount = {};
- std::atomic_uint64_t m_PeakSentBytes = {};
- std::atomic_uint64_t m_PeakReceivedBytes = {};
- std::atomic_uint64_t m_PeakBytesPerSec = {};
+ std::unique_ptr<BuildStorage::Statistics> m_BuildStorageStats;
+ std::unique_ptr<HttpClient> m_BuildStorageHttp;
+ std::unique_ptr<BuildStorage> m_BuildStorage;
+ const std::string m_Url;
+ const std::string m_Namespace;
+ const std::string m_Bucket;
+ const Oid m_BuildId;
+ IoBuffer m_MetaData;
+ Oid m_OplogBuildPartId = Oid::Zero;
+ const bool m_EnableBlocks = true;
+ const bool m_UseTempBlocks = true;
+ const bool m_AllowRedirect = false;
};
std::shared_ptr<RemoteProjectStore>
-CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
+CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet)
{
std::string Url = Options.Url;
if (Url.find("://"sv) == std::string::npos)
@@ -468,13 +473,7 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file
// Assume https URL
Url = fmt::format("https://{}"sv, Url);
}
- JupiterClientOptions ClientOptions{.Name = "Remote store"sv,
- .ServiceUrl = Url,
- .ConnectTimeout = std::chrono::milliseconds(2000),
- .Timeout = std::chrono::milliseconds(1800000),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 4};
+
// 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
// 2) Access token as parameter in request
// 3) Environment variable (different win vs linux/mac)
@@ -491,7 +490,7 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file
}
else if (!Options.OidcExePath.empty())
{
- if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url); TokenProviderMaybe)
+ if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet); TokenProviderMaybe)
{
TokenProvider = TokenProviderMaybe.value();
}
@@ -502,16 +501,31 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file
TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
}
- Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider)));
+ HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
+ .ConnectTimeout = std::chrono::milliseconds(2000),
+ .Timeout = std::chrono::milliseconds(1800000),
+ .AccessTokenProvider = std::move(TokenProvider),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 4};
+
+ std::unique_ptr<BuildStorage::Statistics> BuildStorageStats(std::make_unique<BuildStorage::Statistics>());
+
+ std::unique_ptr<HttpClient> BuildStorageHttp = std::make_unique<HttpClient>(Url, ClientSettings);
+
+ std::unique_ptr<BuildStorage> BuildStorage =
+ CreateJupiterBuildStorage(Log(), *BuildStorageHttp, *BuildStorageStats, Options.Namespace, Options.Bucket, false, TempFilePath);
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(Client),
+ std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(BuildStorageStats),
+ std::move(BuildStorageHttp),
+ std::move(BuildStorage),
+ Url,
Options.Namespace,
Options.Bucket,
Options.BuildId,
Options.MetaData,
Options.ForceDisableBlocks,
- Options.ForceDisableTempBlocks,
- TempFilePath);
+ Options.ForceDisableTempBlocks);
return RemoteStore;
}
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h
index c52b13886..60b6caef7 100644
--- a/src/zenserver/projectstore/buildsremoteprojectstore.h
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.h
@@ -24,7 +24,8 @@ struct BuildsRemoteStoreOptions : RemoteStoreOptions
IoBuffer MetaData;
};
-std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath);
+std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(const BuildsRemoteStoreOptions& Options,
+ const std::filesystem::path& TempFilePath,
+ bool Quiet);
} // namespace zen
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 317a419eb..9600133f3 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -235,13 +235,11 @@ namespace {
//////////////////////////////////////////////////////////////////////////
-HttpProjectService::HttpProjectService(CidStore& Store,
- ProjectStore* Projects,
+HttpProjectService::HttpProjectService(ProjectStore* Projects,
HttpStatusService& StatusService,
HttpStatsService& StatsService,
AuthMgr& AuthMgr)
: m_Log(logging::Get("project"))
-, m_CidStore(Store)
, m_ProjectStore(Projects)
, m_StatusService(StatusService)
, m_StatsService(StatsService)
@@ -407,8 +405,45 @@ HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq)
{
ZEN_TRACE_CPU("ProjectService::Stats");
- const GcStorageSize StoreSize = m_ProjectStore->StorageSize();
- const CidStoreSize CidSize = m_CidStore.TotalSize();
+ bool ShowCidStoreStats = HttpReq.GetQueryParams().GetValue("cidstorestats") == "true";
+
+ const GcStorageSize StoreSize = m_ProjectStore->StorageSize();
+ uint64_t TotalChunkHitCount = 0;
+ uint64_t TotalChunkMissCount = 0;
+ uint64_t TotalChunkWriteCount = 0;
+ CidStoreSize TotalCidSize;
+
+ tsl::robin_map<CidStore*, std::string> UniqueStores;
+ {
+ m_ProjectStore->IterateProjects([&UniqueStores](ProjectStore::Project& Project) {
+ CidStore* Store = &Project.GetCidStore();
+ if (auto It = UniqueStores.find(Store); It == UniqueStores.end())
+ {
+ UniqueStores.insert_or_assign(Store, Project.Identifier);
+ }
+ else
+ {
+ UniqueStores.insert_or_assign(Store, std::string{});
+ }
+ });
+
+ for (auto It : UniqueStores)
+ {
+ CidStore* ChunkStore = It.first;
+
+ CidStoreStats ChunkStoreStats = ChunkStore->Stats();
+ CidStoreSize ChunkStoreSize = ChunkStore->TotalSize();
+
+ TotalChunkHitCount += ChunkStoreStats.HitCount;
+ TotalChunkMissCount += ChunkStoreStats.MissCount;
+ TotalChunkWriteCount += ChunkStoreStats.WriteCount;
+
+ TotalCidSize.TinySize += ChunkStoreSize.TinySize;
+ TotalCidSize.SmallSize += ChunkStoreSize.SmallSize;
+ TotalCidSize.LargeSize += ChunkStoreSize.LargeSize;
+ TotalCidSize.TotalSize += ChunkStoreSize.TotalSize;
+ }
+ }
CbObjectWriter Cbo;
@@ -460,12 +495,66 @@ HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq)
{
Cbo.BeginObject("size");
{
- Cbo << "tiny" << CidSize.TinySize;
- Cbo << "small" << CidSize.SmallSize;
- Cbo << "large" << CidSize.LargeSize;
- Cbo << "total" << CidSize.TotalSize;
+ Cbo << "tiny" << TotalCidSize.TinySize;
+ Cbo << "small" << TotalCidSize.SmallSize;
+ Cbo << "large" << TotalCidSize.LargeSize;
+ Cbo << "total" << TotalCidSize.TotalSize;
}
Cbo.EndObject();
+
+ if (ShowCidStoreStats)
+ {
+ Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount;
+ const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount;
+ Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0);
+
+ Cbo.BeginObject("store");
+
+ auto OutputStats = [&](CidStore& ChunkStore) {
+ CidStoreStats StoreStats = ChunkStore.Stats();
+ Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount;
+ const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount;
+ Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0);
+ EmitSnapshot("read", StoreStats.FindChunkOps, Cbo);
+ EmitSnapshot("write", StoreStats.AddChunkOps, Cbo);
+ };
+
+ if (UniqueStores.size() > 1)
+ {
+ Cbo.BeginArray("projects");
+ for (auto It : UniqueStores)
+ {
+ CidStore* ChunkStore = It.first;
+ const std::string& ProjectId = It.second;
+ CidStoreSize ChunkStoreSize = ChunkStore->TotalSize();
+
+ Cbo.BeginObject();
+ {
+ Cbo << "project" << ProjectId;
+ Cbo.BeginObject("stats");
+ OutputStats(*ChunkStore);
+ Cbo.EndObject();
+
+ Cbo.BeginObject("size");
+ {
+ Cbo << "tiny" << ChunkStoreSize.TinySize;
+ Cbo << "small" << ChunkStoreSize.SmallSize;
+ Cbo << "large" << ChunkStoreSize.LargeSize;
+ Cbo << "total" << ChunkStoreSize.TotalSize;
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray(); // projects
+ }
+ else if (UniqueStores.size() != 0)
+ {
+ CidStore& ChunkStore = *UniqueStores.begin()->first;
+ OutputStats(ChunkStore);
+ }
+ Cbo.EndObject();
+ }
}
Cbo.EndObject();
@@ -1125,6 +1214,8 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
}
Project->TouchOplog(OplogId);
+ CidStore& ChunkStore = Project->GetCidStore();
+
ProjectStore::Oplog& Oplog = *FoundLog;
IoBuffer Payload = HttpReq.ReadPayload();
@@ -1137,7 +1228,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
std::vector<IoHash> MissingChunks;
CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer {
- if (m_CidStore.ContainsChunk(Hash))
+ if (ChunkStore.ContainsChunk(Hash))
{
// Return null attachment as we already have it, no point in reading it and storing it again
return {};
@@ -1393,6 +1484,8 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req)
}
Project->TouchOplog(OplogId);
+ CidStore& ChunkStore = Project->GetCidStore();
+
ProjectStore::Oplog& Oplog = *FoundLog;
if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString))
@@ -1407,7 +1500,7 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req)
Op.IterateAttachments([&](CbFieldView FieldView) {
const IoHash AttachmentHash = FieldView.AsAttachment();
- IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash);
if (Payload)
{
switch (Payload.GetContentType())
@@ -2036,11 +2129,14 @@ HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req)
CSVHeader(Details, AttachmentDetails, CSVWriter);
m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
+ CidStore& ChunkStore = Project.GetCidStore();
+
Project.IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) {
- Oplog.IterateOplogWithKey(
- [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) {
- CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
- });
+ Oplog.IterateOplogWithKey([this, &Project, &Oplog, &ChunkStore, &CSVWriter, Details, AttachmentDetails](uint32_t LSN,
+ const Oid& Key,
+ CbObjectView Op) {
+ CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ });
});
});
@@ -2054,8 +2150,9 @@ HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req)
m_ProjectStore->DiscoverProjects();
m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) {
- std::vector<std::string> OpLogs = Project.ScanForOplogs();
- CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ CidStore& ChunkStore = Project.GetCidStore();
+ std::vector<std::string> OpLogs = Project.ScanForOplogs();
+ CbWriteProject(ChunkStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
});
}
Cbo.EndArray();
@@ -2084,7 +2181,8 @@ HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req)
{
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
- ProjectStore::Project& Project = *FoundProject.Get();
+ ProjectStore::Project& Project = *FoundProject.Get();
+ CidStore& ChunkStore = Project.GetCidStore();
if (CSV)
{
@@ -2092,10 +2190,11 @@ HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req)
CSVHeader(Details, AttachmentDetails, CSVWriter);
FoundProject->IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) {
- Oplog.IterateOplogWithKey(
- [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) {
- CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
- });
+ Oplog.IterateOplogWithKey([this, &Project, &Oplog, &ChunkStore, &CSVWriter, Details, AttachmentDetails](uint32_t LSN,
+ const Oid& Key,
+ CbObjectView Op) {
+ CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ });
});
HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
}
@@ -2105,7 +2204,7 @@ HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req)
std::vector<std::string> OpLogs = FoundProject->ScanForOplogs();
Cbo.BeginArray("projects");
{
- CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
+ CbWriteProject(ChunkStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo);
}
Cbo.EndArray();
HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
@@ -2141,16 +2240,17 @@ HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req)
return HttpReq.WriteResponse(HttpResponseCode::NotFound);
}
- ProjectStore::Project& Project = *FoundProject.Get();
- ProjectStore::Oplog& Oplog = *FoundLog;
+ ProjectStore::Project& Project = *FoundProject.Get();
+ CidStore& ChunkStore = Project.GetCidStore();
+ ProjectStore::Oplog& Oplog = *FoundLog;
if (CSV)
{
ExtendableStringBuilder<4096> CSVWriter;
CSVHeader(Details, AttachmentDetails, CSVWriter);
Oplog.IterateOplogWithKey(
- [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) {
- CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
+ [this, &Project, &Oplog, &ChunkStore, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) {
+ CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter);
});
HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
}
@@ -2159,7 +2259,7 @@ HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req)
CbObjectWriter Cbo;
Cbo.BeginArray("oplogs");
{
- CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
+ CbWriteOplog(ChunkStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo);
}
Cbo.EndArray();
HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
@@ -2204,9 +2304,10 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req)
fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, OpId));
}
- const Oid ObjId = Oid::FromHexString(OpId);
- ProjectStore::Project& Project = *FoundProject.Get();
- ProjectStore::Oplog& Oplog = *FoundLog;
+ const Oid ObjId = Oid::FromHexString(OpId);
+ ProjectStore::Project& Project = *FoundProject.Get();
+ CidStore& ChunkStore = Project.GetCidStore();
+ ProjectStore::Oplog& Oplog = *FoundLog;
std::optional<CbObject> Op = Oplog.GetOpByKey(ObjId);
if (!Op.has_value())
@@ -2224,7 +2325,7 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req)
ExtendableStringBuilder<4096> CSVWriter;
CSVHeader(Details, AttachmentDetails, CSVWriter);
- CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN.value(), ObjId, Op.value(), CSVWriter);
+ CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN.value(), ObjId, Op.value(), CSVWriter);
HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView());
}
else
@@ -2232,7 +2333,7 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req)
CbObjectWriter Cbo;
Cbo.BeginArray("ops");
{
- CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN.value(), ObjId, Op.value(), Cbo);
+ CbWriteOp(ChunkStore, Details, OpDetails, AttachmentDetails, LSN.value(), ObjId, Op.value(), Cbo);
}
Cbo.EndArray();
HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h
index 295defa5c..5782188e6 100644
--- a/src/zenserver/projectstore/httpprojectstore.h
+++ b/src/zenserver/projectstore/httpprojectstore.h
@@ -35,11 +35,7 @@ class ProjectStore;
class HttpProjectService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
{
public:
- HttpProjectService(CidStore& Store,
- ProjectStore* InProjectStore,
- HttpStatusService& StatusService,
- HttpStatsService& StatsService,
- AuthMgr& AuthMgr);
+ HttpProjectService(ProjectStore* InProjectStore, HttpStatusService& StatusService, HttpStatsService& StatsService, AuthMgr& AuthMgr);
~HttpProjectService();
virtual const char* BaseUri() const override;
@@ -92,7 +88,6 @@ private:
inline LoggerRef Log() { return m_Log; }
LoggerRef m_Log;
- CidStore& m_CidStore;
HttpRequestRouter m_Router;
Ref<ProjectStore> m_ProjectStore;
HttpStatusService& m_StatusService;
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index 3728babb5..dba5cd4a7 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -337,7 +337,7 @@ private:
};
std::shared_ptr<RemoteProjectStore>
-CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
+CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath, bool Quiet)
{
std::string Url = Options.Url;
if (Url.find("://"sv) == std::string::npos)
@@ -368,7 +368,7 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
}
else if (!Options.OidcExePath.empty())
{
- if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url); TokenProviderMaybe)
+ if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url, Quiet); TokenProviderMaybe)
{
TokenProvider = TokenProviderMaybe.value();
}
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h
index 8bf79d563..ac2d25b47 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.h
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h
@@ -25,6 +25,7 @@ struct JupiterRemoteStoreOptions : RemoteStoreOptions
};
std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath);
+ const std::filesystem::path& TempFilePath,
+ bool Quiet);
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 53e687983..d6dd6ef9b 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -267,7 +267,7 @@ namespace {
ForceDisableBlocks,
ForceDisableTempBlocks,
AssumeHttp2};
- RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath);
+ RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath, /*Quiet*/ false);
}
if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
@@ -364,7 +364,7 @@ namespace {
ForceDisableTempBlocks,
AssumeHttp2,
MetaData};
- RemoteStore = CreateBuildsRemoteStore(Options, TempFilePath);
+ RemoteStore = CreateJupiterBuildsRemoteStore(Options, TempFilePath, /*Quiet*/ false);
}
if (!RemoteStore)
@@ -1176,7 +1176,7 @@ ProjectStore::Oplog::Flush()
}
void
-ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx)
+ProjectStore::Oplog::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
@@ -1492,7 +1492,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f
if (ValidationError != CbValidateError::None)
{
- ZEN_ERROR("validation error {} hit for oplog config at '{}'", int(ValidationError), StateFilePath);
+ ZEN_ERROR("validation error {} hit for oplog config at '{}'", ToString(ValidationError), StateFilePath);
return CbObject();
}
@@ -3168,7 +3168,7 @@ ProjectStore::Project::Read()
}
else
{
- ZEN_ERROR("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath);
+ ZEN_ERROR("validation error {} hit for '{}'", ToString(ValidationError), ProjectStateFilePath);
}
ReadAccessTimes();
@@ -3260,7 +3260,7 @@ ProjectStore::Project::ReadAccessTimes()
}
else
{
- ZEN_WARN("project '{}': validation error {} hit for '{}'", Identifier, int(ValidationError), ProjectAccessTimesFilePath);
+ ZEN_WARN("project '{}': validation error {} hit for '{}'", Identifier, ToString(ValidationError), ProjectAccessTimesFilePath);
}
}
@@ -3575,7 +3575,7 @@ ProjectStore::Project::Flush()
}
void
-ProjectStore::Project::ScrubStorage(ScrubContext& Ctx)
+ProjectStore::Project::Scrub(ScrubContext& Ctx)
{
ZEN_MEMSCOPE(GetProjectstoreTag());
// Scrubbing needs to check all existing oplogs
@@ -3587,7 +3587,7 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx)
IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) {
if (!IsExpired(GcClock::TimePoint::min(), Ops))
{
- Ops.ScrubStorage(Ctx);
+ Ops.Scrub(Ctx);
}
});
}
@@ -3832,7 +3832,7 @@ ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const
//////////////////////////////////////////////////////////////////////////
-ProjectStore::ProjectStore(CidStore& Store,
+ProjectStore::ProjectStore(GetCidStoreFunc&& GetCidStore,
std::filesystem::path BasePath,
GcManager& Gc,
JobQueue& JobQueue,
@@ -3840,7 +3840,7 @@ ProjectStore::ProjectStore(CidStore& Store,
const Configuration& Config)
: m_Log(logging::Get("project"))
, m_Gc(Gc)
-, m_CidStore(Store)
+, m_GetCidStore(std::move(GetCidStore))
, m_JobQueue(JobQueue)
, m_OpenProcessCache(InOpenProcessCache)
, m_ProjectBasePath(BasePath)
@@ -3973,7 +3973,7 @@ ProjectStore::ScrubStorage(ScrubContext& Ctx)
}
for (const Ref<Project>& Project : Projects)
{
- Project->ScrubStorage(Ctx);
+ Project->Scrub(Ctx);
}
}
@@ -4025,6 +4025,8 @@ ProjectStore::OpenProject(std::string_view ProjectId)
}
}
+ CidStore& ChunkStore = m_GetCidStore(ProjectId);
+
RwLock::ExclusiveLockScope _(m_ProjectsLock);
if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end())
{
@@ -4041,7 +4043,7 @@ ProjectStore::OpenProject(std::string_view ProjectId)
Ref<Project>& Prj =
m_Projects
- .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
+ .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, ChunkStore, BasePath)))
.first->second;
Prj->Identifier = ProjectId;
Prj->Read();
@@ -4068,12 +4070,14 @@ ProjectStore::NewProject(const std::filesystem::path& BasePath,
ZEN_MEMSCOPE(GetProjectstoreTag());
ZEN_TRACE_CPU("Store::NewProject");
+ CidStore& ChunkStore = m_GetCidStore(ProjectId);
+
RwLock::ExclusiveLockScope _(m_ProjectsLock);
ZEN_INFO("project '{}': creating project at '{}'", ProjectId, BasePath);
Ref<Project>& Prj =
- m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath)))
+ m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, ChunkStore, BasePath)))
.first->second;
Prj->Identifier = ProjectId;
Prj->RootDir = RootDir;
@@ -4802,7 +4806,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
}
const IoHash Hash = IoHash::FromHexString(Cid);
- OutChunk = m_CidStore.FindChunkByCid(Hash);
+ OutChunk = Project->GetCidStore().FindChunkByCid(Hash);
if (!OutChunk)
{
@@ -4865,7 +4869,7 @@ ProjectStore::PutChunk(const std::string_view ProjectId,
}
FoundLog->CaptureAddedAttachments(std::vector<IoHash>{Hash});
- CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash);
+ CidStore::InsertResult Result = Project->GetCidStore().AddChunk(Chunk, Hash);
return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}};
}
@@ -4894,18 +4898,19 @@ ProjectStore::GetChunks(const std::string_view ProjectId,
}
Project->TouchOplog(OplogId);
+ CidStore& ChunkStore = Project->GetCidStore();
+
if (RequestObject["chunks"sv].IsArray())
{
// Legacy full chunks only by rawhash
- CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView();
-
+ CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView();
CbObjectWriter ResponseWriter;
ResponseWriter.BeginArray("chunks"sv);
for (CbFieldView FieldView : ChunksArray)
{
IoHash RawHash = FieldView.AsHash();
- IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
+ IoBuffer ChunkBuffer = ChunkStore.FindChunkByCid(RawHash);
if (ChunkBuffer)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
@@ -5057,7 +5062,7 @@ ProjectStore::GetChunks(const std::string_view ProjectId,
if (ChunkRequest.Input.Id.index() == 0)
{
const IoHash& ChunkHash = std::get<IoHash>(ChunkRequest.Input.Id);
- IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash);
+ IoBuffer Payload = ChunkStore.FindChunkByCid(ChunkHash);
if (Payload)
{
ChunkRequest.Output.Exists = true;
@@ -5244,7 +5249,7 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie
return {HttpResponseCode::BadRequest, "Invalid payload format"};
}
- CidStore& ChunkStore = m_CidStore;
+ CidStore& ChunkStore = Project->GetCidStore();
RwLock AttachmentsLock;
tsl::robin_set<IoHash, IoHash::Hasher> Attachments;
@@ -5350,7 +5355,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
}
}
- CidStore& ChunkStore = m_CidStore;
+ CidStore& ChunkStore = Project->GetCidStore();
RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
ChunkStore,
@@ -5462,6 +5467,8 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
Project->TouchOplog(OplogId);
+ CidStore& ChunkStore = Project->GetCidStore();
+
if (Method == "import"sv)
{
if (!AreDiskWritesAllowed())
@@ -5543,7 +5550,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
Oplog->CaptureAddedAttachments(WriteRawHashes);
- m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+ ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
}
HttpReq.WriteResponse(HttpResponseCode::OK);
return true;
@@ -5716,14 +5723,14 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
ResponseObj.EndArray();
}
- // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the new
- // chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have
+ // Ops that have moved chunks to a compressed buffer for storage in ChunkStore have been rewritten with references to the new
+ // chunk(s). Make sure we add the chunks to ChunkStore, and do it after we update the oplog so GC doesn't think we have
// unreferenced chunks.
for (auto It : AddedChunks)
{
const IoHash& RawHash = It.first;
AddedChunk& Chunk = It.second;
- CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash);
+ CidStore::InsertResult Result = ChunkStore.AddChunk(Chunk.Buffer, RawHash);
if (Result.New)
{
InlinedBytes += Chunk.RawSize;
@@ -5786,7 +5793,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
EmbedLooseFile,
Force,
IgnoreMissingAttachments](JobContext& Context) {
- RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ RemoteProjectStore::Result Result = SaveOplog(Project->GetCidStore(),
*ActualRemoteStore,
*Project.Get(),
*OplogPtr,
@@ -5801,7 +5808,8 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op
ZEN_INFO("SaveOplog: Status: {} '{}'", ToString(Response.first), Response.second);
if (!IsHttpSuccessCode(Response.first))
{
- throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second);
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
}
});
@@ -5830,19 +5838,26 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
}
std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+ CidStore& ChunkStore = Project.GetCidStore();
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
JobId JobId = m_JobQueue.QueueJob(
fmt::format("Import oplog '{}/{}'", Project.Identifier, Oplog.OplogId()),
- [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, Force, IgnoreMissingAttachments, CleanOplog](
- JobContext& Context) {
+ [this,
+ ChunkStore = &ChunkStore,
+ ActualRemoteStore = std::move(RemoteStore),
+ OplogPtr = &Oplog,
+ Force,
+ IgnoreMissingAttachments,
+ CleanOplog](JobContext& Context) {
RemoteProjectStore::Result Result =
- LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context);
+ LoadOplog(*ChunkStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context);
auto Response = ConvertResult(Result);
ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
if (!IsHttpSuccessCode(Response.first))
{
- throw std::runtime_error(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second);
+ throw JobError(Response.second.empty() ? fmt::format("Status: {}", ToString(Response.first)) : Response.second,
+ (int)Response.first);
}
});
@@ -6875,6 +6890,11 @@ namespace testutils {
return BuildChunksRequest<IoHash>(SkipData, "RawHash", Chunks, Ranges, ModTags);
}
+ ProjectStore::GetCidStoreFunc SingleChunkStore(CidStore& ChunkStore)
+ {
+ return [ChunkStore = &ChunkStore](std::string_view) -> CidStore& { return *ChunkStore; };
+ }
+
} // namespace testutils
TEST_CASE("project.opkeys")
@@ -6937,13 +6957,18 @@ TEST_CASE("project.store.create")
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::string_view ProjectName("proj1"sv);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -6968,12 +6993,17 @@ TEST_CASE("project.store.lifetimes")
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -7031,12 +7061,17 @@ TEST_CASE_TEMPLATE("project.store.export",
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
@@ -7070,7 +7105,7 @@ TEST_CASE_TEMPLATE("project.store.export",
std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
- RemoteProjectStore::Result ExportResult = SaveOplog(CidStore,
+ RemoteProjectStore::Result ExportResult = SaveOplog(ChunkStore,
*RemoteStore,
*Project.Get(),
*Oplog,
@@ -7087,7 +7122,7 @@ TEST_CASE_TEMPLATE("project.store.export",
ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {});
CHECK(OplogImport != nullptr);
- RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
+ RemoteProjectStore::Result ImportResult = LoadOplog(ChunkStore,
*RemoteStore,
*OplogImport,
/*Force*/ false,
@@ -7096,7 +7131,7 @@ TEST_CASE_TEMPLATE("project.store.export",
nullptr);
CHECK(ImportResult.ErrorCode == 0);
- RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
+ RemoteProjectStore::Result ImportForceResult = LoadOplog(ChunkStore,
*RemoteStore,
*OplogImport,
/*Force*/ true,
@@ -7105,7 +7140,7 @@ TEST_CASE_TEMPLATE("project.store.export",
nullptr);
CHECK(ImportForceResult.ErrorCode == 0);
- RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
+ RemoteProjectStore::Result ImportCleanResult = LoadOplog(ChunkStore,
*RemoteStore,
*OplogImport,
/*Force*/ false,
@@ -7114,7 +7149,7 @@ TEST_CASE_TEMPLATE("project.store.export",
nullptr);
CHECK(ImportCleanResult.ErrorCode == 0);
- RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
+ RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(ChunkStore,
*RemoteStore,
*OplogImport,
/*Force*/ true,
@@ -7134,12 +7169,17 @@ TEST_CASE("project.store.gc")
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -7335,12 +7375,17 @@ TEST_CASE("project.store.gc.prep")
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore";
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root";
std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
@@ -7383,7 +7428,7 @@ TEST_CASE("project.store.gc.prep")
// Equivalent of a `prep` existance check call
for (auto Attachment : OpAttachments)
{
- CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
{
@@ -7397,7 +7442,7 @@ TEST_CASE("project.store.gc.prep")
// If a gc comes in between our prep and op write the chunks will be removed
for (auto Attachment : OpAttachments)
{
- CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(!ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
{
@@ -7417,7 +7462,7 @@ TEST_CASE("project.store.gc.prep")
for (auto Attachment : OpAttachments)
{
- CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
{
@@ -7431,7 +7476,7 @@ TEST_CASE("project.store.gc.prep")
// Attachments should now be retained
for (auto Attachment : OpAttachments)
{
- CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
{
@@ -7445,7 +7490,7 @@ TEST_CASE("project.store.gc.prep")
// Attachments should now be retained across multiple GCs if retain time is still valud
for (auto Attachment : OpAttachments)
{
- CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
{
@@ -7457,7 +7502,7 @@ TEST_CASE("project.store.gc.prep")
}
for (auto Attachment : OpAttachments)
{
- CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
{
Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv);
@@ -7474,7 +7519,7 @@ TEST_CASE("project.store.gc.prep")
for (auto Attachment : OpAttachments)
{
- CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(!ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
{
@@ -7507,7 +7552,7 @@ TEST_CASE("project.store.gc.prep")
}
for (auto Attachment : OpAttachments)
{
- CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
Sleep(200);
@@ -7522,7 +7567,7 @@ TEST_CASE("project.store.gc.prep")
}
for (auto Attachment : OpAttachments)
{
- CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
// This pass the retention time has expired and the last GC pass cleared the entries
@@ -7536,7 +7581,7 @@ TEST_CASE("project.store.gc.prep")
for (auto Attachment : OpAttachments)
{
- CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash()));
+ CHECK(!ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash()));
}
}
@@ -7550,12 +7595,17 @@ TEST_CASE("project.store.rpc.getchunks")
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
@@ -8472,12 +8522,17 @@ TEST_CASE("project.store.partial.read")
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
@@ -8650,12 +8705,17 @@ TEST_CASE("project.store.iterateoplog")
auto JobQueue = MakeJobQueue(1, ""sv);
OpenProcessCache ProcessCache;
GcManager Gc;
- CidStore CidStore(Gc);
+ CidStore ChunkStore(Gc);
CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
- CidStore.Initialize(CidConfig);
+ ChunkStore.Initialize(CidConfig);
std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
- ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{});
+ ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore),
+ BasePath,
+ Gc,
+ *JobQueue,
+ ProcessCache,
+ ProjectStore::Configuration{});
std::filesystem::path RootDir = TempDir.Path() / "root"sv;
std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv";
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 368da5ea4..eb27665f9 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -68,7 +68,9 @@ public:
{
};
- ProjectStore(CidStore& Store,
+ typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc;
+
+ ProjectStore(GetCidStoreFunc&& GetCidStore,
std::filesystem::path BasePath,
GcManager& Gc,
JobQueue& JobQueue,
@@ -156,7 +158,7 @@ public:
LoggerRef Log() { return m_OuterProject->Log(); }
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
+ void Scrub(ScrubContext& Ctx);
static uint64_t TotalSize(const std::filesystem::path& BasePath);
uint64_t TotalSize() const;
@@ -326,11 +328,13 @@ public:
Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath);
virtual ~Project();
+ CidStore& GetCidStore() { return m_CidStore; };
+
void Read();
void Write();
[[nodiscard]] static bool Exists(const std::filesystem::path& BasePath);
void Flush();
- void ScrubStorage(ScrubContext& Ctx);
+ void Scrub(ScrubContext& Ctx);
LoggerRef Log() const;
static uint64_t TotalSize(const std::filesystem::path& BasePath);
uint64_t TotalSize() const;
@@ -405,6 +409,7 @@ public:
LoggerRef Log() { return m_Log; }
const std::filesystem::path& BasePath() const { return m_ProjectBasePath; }
+ // GcStorage
virtual void ScrubStorage(ScrubContext& Ctx) override;
virtual GcStorageSize StorageSize() const override;
@@ -498,7 +503,7 @@ public:
private:
LoggerRef m_Log;
GcManager& m_Gc;
- CidStore& m_CidStore;
+ GetCidStoreFunc m_GetCidStore;
JobQueue& m_JobQueue;
OpenProcessCache& m_OpenProcessCache;
std::filesystem::path m_ProjectBasePath;
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index 744b861dd..a1c460bc0 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -1475,12 +1475,17 @@ namespace detail {
class UpstreamCacheImpl final : public UpstreamCache
{
+ struct EnqueuedRequest
+ {
+ UpstreamCacheRecord Record;
+ std::function<IoBuffer(const IoHash& ChunkHash)> GetValueFunc;
+ };
+
public:
- UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+ UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore)
: m_Log(logging::Get("upstream"))
, m_Options(Options)
, m_CacheStore(CacheStore)
- , m_CidStore(CidStore)
{
}
@@ -1836,17 +1841,17 @@ public:
}
}
- virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
+ virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) override
{
if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0)
{
if (!m_UpstreamThreads.empty())
{
- m_UpstreamQueue.Enqueue(std::move(CacheRecord));
+ m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc});
}
else
{
- ProcessCacheRecord(std::move(CacheRecord));
+ ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc));
}
}
}
@@ -1900,7 +1905,7 @@ public:
}
private:
- void ProcessCacheRecord(UpstreamCacheRecord CacheRecord)
+ void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function<IoBuffer(const IoHash& ChunkHash)>&& GetValueFunc)
{
ZEN_TRACE_CPU("Upstream::ProcessCacheRecord");
@@ -1918,7 +1923,7 @@ private:
for (const IoHash& ValueContentId : CacheRecord.ValueContentIds)
{
- if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId))
+ if (IoBuffer Payload = GetValueFunc(ValueContentId))
{
Payloads.push_back(Payload);
}
@@ -1970,19 +1975,19 @@ private:
for (;;)
{
- UpstreamCacheRecord CacheRecord;
- if (m_UpstreamQueue.WaitAndDequeue(CacheRecord))
+ EnqueuedRequest Request;
+ if (m_UpstreamQueue.WaitAndDequeue(Request))
{
try
{
- ProcessCacheRecord(std::move(CacheRecord));
+ ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc));
}
catch (const std::exception& Err)
{
ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'",
- CacheRecord.Namespace,
- CacheRecord.Key.Bucket,
- CacheRecord.Key.Hash,
+ Request.Record.Namespace,
+ Request.Record.Key.Bucket,
+ Request.Record.Key.Hash,
Err.what());
}
}
@@ -2076,7 +2081,7 @@ private:
LoggerRef Log() { return m_Log; }
- using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
+ using UpstreamQueue = BlockingQueue<EnqueuedRequest>;
struct RunState
{
@@ -2102,7 +2107,6 @@ private:
LoggerRef m_Log;
UpstreamCacheOptions m_Options;
ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
UpstreamQueue m_UpstreamQueue;
std::shared_mutex m_EndpointsMutex;
std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
@@ -2126,9 +2130,9 @@ UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, con
}
std::unique_ptr<UpstreamCache>
-CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore)
{
- return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore);
+ return std::make_unique<UpstreamCacheImpl>(Options, CacheStore);
}
} // namespace zen
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h
index 26e5decac..e4b9a73ad 100644
--- a/src/zenserver/upstream/upstreamcache.h
+++ b/src/zenserver/upstream/upstreamcache.h
@@ -24,7 +24,6 @@ class AuthMgr;
class CbObjectView;
class CbPackage;
class CbObjectWriter;
-class CidStore;
class ZenCacheStore;
struct JupiterClientOptions;
class JupiterAccessTokenProvider;
@@ -162,6 +161,6 @@ struct UpstreamCacheOptions
bool WriteUpstream = true;
};
-std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore);
} // namespace zen
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 27ec4c690..5cab54acc 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -239,18 +239,18 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
CidStoreConfiguration Config;
Config.RootDirectory = m_DataRoot / "cas";
- m_CidStore = std::make_unique<CidStore>(m_GcManager);
- m_CidStore->Initialize(Config);
+ m_CidStores.insert_or_assign({}, std::make_unique<CidStore>(m_GcManager));
+ m_CidStores.at({})->Initialize(Config);
ZEN_INFO("instantiating project service");
- m_ProjectStore = new ProjectStore(*m_CidStore,
+ m_ProjectStore = new ProjectStore([this](std::string_view) -> CidStore& { return *m_CidStores.at({}).get(); },
m_DataRoot / "projects",
m_GcManager,
*m_JobQueue,
*m_OpenProcessCache,
ProjectStore::Configuration{});
- m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr});
+ m_HttpProjectService.reset(new HttpProjectService(m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr));
if (ServerOptions.WorksSpacesConfig.Enabled)
{
@@ -265,10 +265,15 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
if (ServerOptions.BuildStoreConfig.Enabled)
{
+ CidStoreConfiguration BuildCidConfig;
+ BuildCidConfig.RootDirectory = m_DataRoot / "builds_cas";
+ m_BuildCidStore = std::make_unique<CidStore>(m_GcManager);
+ m_BuildCidStore->Initialize(BuildCidConfig);
+
BuildStoreConfig BuildsCfg;
BuildsCfg.RootDirectory = m_DataRoot / "builds";
BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit;
- m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager);
+ m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager, *m_BuildCidStore);
}
if (ServerOptions.StructuredCacheConfig.Enabled)
@@ -357,17 +362,15 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_GcScheduler.Initialize(GcConfig);
// Create and register admin interface last to make sure all is properly initialized
- m_AdminService =
- std::make_unique<HttpAdminService>(m_GcScheduler,
- *m_JobQueue,
- m_CacheStore.Get(),
- m_CidStore.get(),
- m_ProjectStore,
- m_BuildStore.get(),
- HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
- .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
- .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"},
- ServerOptions);
+ m_AdminService = std::make_unique<HttpAdminService>(
+ m_GcScheduler,
+ *m_JobQueue,
+ m_CacheStore.Get(),
+ [this]() { Flush(); },
+ HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile,
+ .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log",
+ .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"},
+ ServerOptions);
m_Http->RegisterService(*m_AdminService);
return EffectiveBasePort;
@@ -419,10 +422,10 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions)
if (CbValidateError ValidationResult = ValidateCompactBinary(Manifest, CbValidateMode::All);
ValidationResult != CbValidateError::None)
{
- ZEN_WARN("Manifest validation failed: {}, state will be wiped", uint32_t(ValidationResult));
+ ZEN_WARN("Manifest validation failed: {}, state will be wiped", zen::ToString(ValidationResult));
WipeState = true;
- WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, uint32_t(ValidationResult));
+ WipeReason = fmt::format("Validation of manifest at '{}' failed: {}", ManifestPath, zen::ToString(ValidationResult));
}
else
{
@@ -547,6 +550,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
Config.AllowAutomaticCreationOfNamespaces = true;
Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled,
.EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled};
+
for (const auto& It : ServerOptions.StructuredCacheConfig.PerBucketConfigs)
{
const std::string& BucketName = It.first;
@@ -554,7 +558,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
ZenCacheDiskLayer::BucketConfiguration BucketConfig = {.MaxBlockSize = ZenBucketConfig.MaxBlockSize,
.PayloadAlignment = ZenBucketConfig.PayloadAlignment,
.MemCacheSizeThreshold = ZenBucketConfig.MemCacheSizeThreshold,
- .LargeObjectThreshold = ZenBucketConfig.LargeObjectThreshold};
+ .LargeObjectThreshold = ZenBucketConfig.LargeObjectThreshold,
+ .LimitOverwrites = ZenBucketConfig.LimitOverwrites};
Config.NamespaceConfig.DiskLayerConfig.BucketConfigMap.insert_or_assign(BucketName, BucketConfig);
}
Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MaxBlockSize = ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize,
@@ -564,6 +569,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold,
Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold =
ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold,
+ Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.BucketConfig.LimitOverwrites;
Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes;
Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds;
Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds;
@@ -587,7 +593,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount);
}
- m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore);
+ m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore);
m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr);
m_UpstreamCache->Initialize();
@@ -651,19 +657,24 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
}
}
- m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore,
- *m_CidStore,
- m_StatsService,
- m_StatusService,
- *m_UpstreamCache,
- m_GcManager.GetDiskWriteBlocker(),
- *m_OpenProcessCache);
+ m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(
+ *m_CacheStore,
+ [this](std::string_view) -> CidStore& { return *m_CidStores.at({}).get(); },
+ m_StatsService,
+ m_StatusService,
+ *m_UpstreamCache,
+ m_GcManager.GetDiskWriteBlocker(),
+ *m_OpenProcessCache);
m_Http->RegisterService(*m_StructuredCacheService);
m_Http->RegisterService(*m_UpstreamService);
m_StatsReporter.AddProvider(m_CacheStore.Get());
- m_StatsReporter.AddProvider(m_CidStore.get());
+ for (const auto& It : m_CidStores)
+ {
+ m_StatsReporter.AddProvider(It.second.get());
+ }
+ m_StatsReporter.AddProvider(m_BuildCidStore.get());
}
void
@@ -836,6 +847,7 @@ ZenServer::Cleanup()
m_BuildStoreService.reset();
m_BuildStore = {};
+ m_BuildCidStore.reset();
m_StructuredCacheService.reset();
m_UpstreamService.reset();
@@ -847,7 +859,7 @@ ZenServer::Cleanup()
m_Workspaces.reset();
m_HttpProjectService.reset();
m_ProjectStore = {};
- m_CidStore.reset();
+ m_CidStores.clear();
m_AuthService.reset();
m_AuthMgr.reset();
m_Http = {};
@@ -1028,37 +1040,21 @@ ZenServer::CheckOwnerPid()
}
void
-ZenServer::ScrubStorage()
-{
- Stopwatch Timer;
- ZEN_INFO("Storage validation STARTING");
-
- WorkerThreadPool ThreadPool{1, "Scrub"};
- ScrubContext Ctx{ThreadPool};
- m_CidStore->ScrubStorage(Ctx);
- m_ProjectStore->ScrubStorage(Ctx);
- m_StructuredCacheService->ScrubStorage(Ctx);
-
- const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
-
- ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})",
- NiceTimeSpanMs(ElapsedTimeMs),
- NiceBytes(Ctx.ScrubbedBytes()),
- Ctx.ScrubbedChunks(),
- NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs));
-}
-
-void
ZenServer::Flush()
{
- if (m_CidStore)
- m_CidStore->Flush();
+ for (auto& It : m_CidStores)
+ {
+ It.second->Flush();
+ }
if (m_StructuredCacheService)
m_StructuredCacheService->Flush();
if (m_ProjectStore)
m_ProjectStore->Flush();
+
+ if (m_BuildCidStore)
+ m_BuildCidStore->Flush();
}
void
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index 5cfa04ba1..0a17446ae 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -82,7 +82,6 @@ public:
void CheckStateExitFlag();
void CheckOwnerPid();
bool UpdateProcessMonitor();
- void ScrubStorage();
void Flush();
virtual void HandleStatusRequest(HttpServerRequest& Request) override;
@@ -116,19 +115,20 @@ private:
inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; }
static std::string_view ToString(ServerState Value);
- StatsReporter m_StatsReporter;
- Ref<HttpServer> m_Http;
- std::unique_ptr<AuthMgr> m_AuthMgr;
- std::unique_ptr<HttpAuthService> m_AuthService;
- HttpStatusService m_StatusService;
- HttpStatsService m_StatsService;
- GcManager m_GcManager;
- GcScheduler m_GcScheduler{m_GcManager};
- std::unique_ptr<CidStore> m_CidStore;
- Ref<ZenCacheStore> m_CacheStore;
- std::unique_ptr<OpenProcessCache> m_OpenProcessCache;
- HttpTestService m_TestService;
- std::unique_ptr<BuildStore> m_BuildStore;
+ StatsReporter m_StatsReporter;
+ Ref<HttpServer> m_Http;
+ std::unique_ptr<AuthMgr> m_AuthMgr;
+ std::unique_ptr<HttpAuthService> m_AuthService;
+ HttpStatusService m_StatusService;
+ HttpStatsService m_StatsService;
+ GcManager m_GcManager;
+ GcScheduler m_GcScheduler{m_GcManager};
+ tsl::robin_map<std::string, std::unique_ptr<CidStore>> m_CidStores;
+ Ref<ZenCacheStore> m_CacheStore;
+ std::unique_ptr<OpenProcessCache> m_OpenProcessCache;
+ HttpTestService m_TestService;
+ std::unique_ptr<CidStore> m_BuildCidStore;
+ std::unique_ptr<BuildStore> m_BuildStore;
#if ZEN_WITH_TESTS
HttpTestingService m_TestingService;