aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-25 13:36:53 +0100
committerGitHub Enterprise <[email protected]>2024-11-25 13:36:53 +0100
commite75c5860277681be7b4a18d8d630f76c051719b4 (patch)
tree52533fe3c0e60cfe89e08ff5a4be00b215933670 /src/zenserver
parentadd missing projectstore expire time in gc log (#227) (diff)
downloadzen-e75c5860277681be7b4a18d8d630f76c051719b4.tar.xz
zen-e75c5860277681be7b4a18d8d630f76c051719b4.zip
stronger validation of payload existance (#229)
- Don't add RawSize and Size in ProjectStore::GetProjectFiles response if we can't get the payload - Use validation of payload size/existance in all chunk fetch operations in file cas - In project store oplog validate, make sure we can reach all the payloads - Add threading to oplog validate request
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp5
-rw-r--r--src/zenserver/projectstore/projectstore.cpp147
-rw-r--r--src/zenserver/projectstore/projectstore.h2
3 files changed, 110 insertions, 44 deletions
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index 2954bcdc0..4babcd224 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -14,6 +14,7 @@
#include <zencore/stream.h>
#include <zencore/trace.h>
#include <zenstore/zenstore.h>
+#include <zenutil/workerpools.h>
namespace zen {
@@ -1164,7 +1165,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
void
HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req)
{
- ZEN_TRACE_CPU("ProjectService::OplogOpNew");
+ ZEN_TRACE_CPU("ProjectService::OplogOpValidate");
using namespace std::literals;
@@ -1197,7 +1198,7 @@ HttpProjectService::HandleOplogValidateRequest(HttpRouterRequest& Req)
ProjectStore::Oplog& Oplog = *FoundLog;
std::atomic_bool CancelFlag = false;
- ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag);
+ ProjectStore::Oplog::ValidationResult Result = Oplog.Validate(CancelFlag, &GetSmallWorkerPool(EWorkloadType::Burst));
tsl::robin_map<Oid, std::string, Oid::Hasher> KeyNameLookup;
KeyNameLookup.reserve(Result.OpKeys.size());
for (const auto& It : Result.OpKeys)
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 1b48a542c..a43c9e0e2 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1323,7 +1323,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f
}
ProjectStore::Oplog::ValidationResult
-ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
+ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool)
{
using namespace std::literals;
@@ -1348,25 +1348,27 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
});
Result.OpCount = gsl::narrow<uint32_t>(Keys.size());
- for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
- {
+
+ RwLock ResultLock;
+
+ auto ValidateOne = [&](uint32_t OpIndex) {
const Oid& KeyHash = KeyHashes[OpIndex];
const std::string& Key = Keys[OpIndex];
const OplogEntryMapping& Mapping(Mappings[OpIndex]);
bool HasMissingEntries = false;
for (const ChunkMapping& Chunk : Mapping.Chunks)
{
- if (!m_CidStore.ContainsChunk(Chunk.Hash))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload)
{
- Result.MissingChunks.push_back({KeyHash, Chunk});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); });
HasMissingEntries = true;
}
}
for (const ChunkMapping& Meta : Mapping.Meta)
{
- if (!m_CidStore.ContainsChunk(Meta.Hash))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload)
{
- Result.MissingMetas.push_back({KeyHash, Meta});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); });
HasMissingEntries = true;
}
}
@@ -1377,15 +1379,15 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
std::filesystem::path FilePath = m_OuterProject->RootDir / File.ServerPath;
if (!std::filesystem::is_regular_file(FilePath))
{
- Result.MissingFiles.push_back({KeyHash, File});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
HasMissingEntries = true;
}
}
else
{
- if (!m_CidStore.ContainsChunk(File.Hash))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload)
{
- Result.MissingFiles.push_back({KeyHash, File});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); });
HasMissingEntries = true;
}
}
@@ -1393,18 +1395,39 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag)
const std::vector<IoHash>& OpAttachments = Attachments[OpIndex];
for (const IoHash& Attachment : OpAttachments)
{
- if (!m_CidStore.ContainsChunk(Attachment))
+ if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload)
{
- Result.MissingAttachments.push_back({KeyHash, Attachment});
+ ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); });
HasMissingEntries = true;
}
}
if (HasMissingEntries)
{
- Result.OpKeys.push_back({KeyHash, Key});
+ ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); });
+ }
+ };
+
+ Latch WorkLatch(1);
+
+ for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
+ {
+ if (OptionalWorkerPool)
+ {
+ WorkLatch.AddCount(1);
+ OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() {
+ auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); });
+ ValidateOne(Index);
+ });
+ }
+ else
+ {
+ ValidateOne(OpIndex);
}
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
+
{
// Check if we were deleted while we were checking the references without a lock...
RwLock::SharedLockScope _(m_OplogLock);
@@ -1963,12 +1986,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
try
{
IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
- if (Payload)
+ if (!AsyncCallback(FileChunkIndex, Payload))
{
- if (!AsyncCallback(FileChunkIndex, Payload))
- {
- Result.store(false);
- }
+ Result.store(false);
}
}
catch (const std::exception& Ex)
@@ -3922,11 +3942,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
{
if (WantsSizeField)
{
- Sizes.resize(Ids.size(), 0u);
+ Sizes.resize(Ids.size(), (uint64_t)-1);
}
if (WantsRawSizeField)
{
- RawSizes.resize(Ids.size(), 0u);
+ RawSizes.resize(Ids.size(), (uint64_t)-1);
}
FoundLog->IterateChunks(
@@ -3934,20 +3954,35 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
[&](size_t Index, const IoBuffer& Payload) {
try
{
- uint64_t Size = Payload.GetSize();
- if (WantsRawSizeField)
+ if (Payload)
{
- uint64_t RawSize = Size;
- if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ if (WantsRawSizeField)
+ {
+ if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ IoHash _;
+ uint64_t RawSize;
+ if (!CompressedBuffer::ValidateCompressedHeader(Payload, _, RawSize))
+ {
+ ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.",
+ ProjectId,
+ OplogId,
+ Ids[Index]);
+ }
+ else
+ {
+ RawSizes[Index] = RawSize;
+ }
+ }
+ }
+ if (WantsSizeField)
{
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Payload), __, RawSize);
+ Sizes[Index] = Payload.GetSize();
}
- RawSizes[Index] = RawSize;
}
- if (WantsSizeField)
+ else
{
- Sizes[Index] = Size;
+ ZEN_WARN("oplog '{}/{}': failed getting payload for project file info for id {}.", ProjectId, OplogId, Ids[Index]);
}
}
catch (const std::exception& Ex)
@@ -3980,11 +4015,11 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId,
{
Response << "clientpath"sv << ClientPaths[Index];
}
- if (WantsSizeField)
+ if (WantsSizeField && Sizes[Index] != (uint64_t)-1)
{
Response << "size"sv << Sizes[Index];
}
- if (WantsRawSizeField)
+ if (WantsRawSizeField && RawSizes[Index] != (uint64_t)-1)
{
Response << "rawsize"sv << RawSizes[Index];
}
@@ -4062,20 +4097,50 @@ ProjectStore::GetProjectChunkInfos(const std::string_view ProjectId,
[&](size_t Index, const IoBuffer& Chunk) -> bool {
try
{
- uint64_t Size = Chunk.GetSize();
- if (WantsRawSizeField)
+ if (Chunk)
{
- uint64_t RawSize = Size;
- if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ if (WantsRawSizeField)
+ {
+ if (Chunk.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t RawSize;
+ IoHash RawHash;
+ if (!CompressedBuffer::ValidateCompressedHeader(Chunk, RawHash, RawSize))
+ {
+ ZEN_WARN("oplog '{}/{}': payload for project file info for id {} is not a valid compressed binary.",
+ ProjectId,
+ OplogId,
+ Ids[Index]);
+ }
+ else if (RawHash != Hashes[Index])
+ {
+ ZEN_WARN(
+ "oplog '{}/{}': payload for project file info for id {} does not match expected raw hash {}, found "
+ "{}.",
+ ProjectId,
+ OplogId,
+ Ids[Index],
+ Hashes[Index],
+ RawHash);
+ }
+ else
+ {
+ RawSizes[Index] = RawSize;
+ }
+ }
+ else
+ {
+ RawSizes[Index] = Chunk.GetSize();
+ }
+ }
+ if (WantsSizeField)
{
- IoHash __;
- (void)CompressedBuffer::FromCompressed(SharedBuffer(Chunk), __, RawSize);
+ Sizes[Index] = Chunk.GetSize();
}
- RawSizes[Index] = RawSize;
}
- if (WantsSizeField)
+ else
{
- Sizes[Index] = Size;
+ ZEN_WARN("oplog '{}/{}': failed getting payload for chunk for id {}", ProjectId, OplogId, Ids[Index]);
}
}
catch (const std::exception& Ex)
@@ -5782,7 +5847,7 @@ public:
if (Oplog != nullptr)
{
- Result = Oplog->Validate(Ctx.IsCancelledFlag);
+ Result = Oplog->Validate(Ctx.IsCancelledFlag, nullptr);
if (Ctx.IsCancelledFlag)
{
return;
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index 1619151dd..d8b13585b 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -212,7 +212,7 @@ public:
}
};
- ValidationResult Validate(std::atomic_bool& IsCancelledFlag);
+ ValidationResult Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPool* OptionalWorkerPool);
private:
struct FileMapEntry