aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/buildstoragecache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/buildstoragecache.cpp')
-rw-r--r--src/zenutil/buildstoragecache.cpp59
1 files changed, 52 insertions, 7 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index c95215889..f273ac699 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -11,6 +11,7 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
+#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
@@ -27,13 +28,16 @@ public:
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
: m_HttpClient(HttpClient)
, m_Stats(Stats)
, m_Namespace(Namespace.empty() ? "none" : Namespace)
, m_Bucket(Bucket.empty() ? "none" : Bucket)
, m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
- , m_BackgroundWorkPool(1)
+ , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount)
+ , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background)
+ : GetTinyWorkerPool(EWorkloadType::Background))
, m_PendingBackgroundWorkCount(1)
, m_CancelBackgroundWork(false)
{
@@ -44,8 +48,11 @@ public:
try
{
m_CancelBackgroundWork.store(true);
- m_PendingBackgroundWorkCount.CountDown();
- m_PendingBackgroundWorkCount.Wait();
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ m_PendingBackgroundWorkCount.Wait();
+ }
}
catch (const std::exception& Ex)
{
@@ -86,6 +93,7 @@ public:
ZenContentType ContentType,
const CompositeBuffer& Payload) override
{
+ ZEN_ASSERT(!IsFlushed);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
ScheduleBackgroundWork(
[this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
@@ -132,6 +140,7 @@ public:
virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
{
+ ZEN_ASSERT(!IsFlushed);
ScheduleBackgroundWork([this,
BuildId = Oid(BuildId),
BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()),
@@ -329,6 +338,39 @@ public:
return {};
}
+ virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
+ {
+ if (IsFlushed)
+ {
+ return;
+ }
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ IsFlushed = true;
+ }
+ if (m_PendingBackgroundWorkCount.Wait(100))
+ {
+ return;
+ }
+ while (true)
+ {
+ intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
+ if (UpdateCallback(Remaining))
+ {
+ if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS))
+ {
+ UpdateCallback(0);
+ return;
+ }
+ }
+ else
+ {
+ m_CancelBackgroundWork.store(true);
+ }
+ }
+ }
+
private:
void AddStatistic(const HttpClient::Response& Result)
{
@@ -343,8 +385,10 @@ private:
const std::string m_Namespace;
const std::string m_Bucket;
const std::filesystem::path m_TempFolderPath;
+ const bool m_BoostBackgroundThreadCount;
+ bool IsFlushed = false;
- WorkerThreadPool m_BackgroundWorkPool;
+ WorkerThreadPool& m_BackgroundWorkPool;
Latch m_PendingBackgroundWorkCount;
std::atomic<bool> m_CancelBackgroundWork;
};
@@ -354,9 +398,10 @@ CreateZenBuildStorageCache(HttpClient& HttpClient,
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
{
- return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath);
+ return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount);
}
} // namespace zen