aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-18 17:31:15 +0200
committerGitHub Enterprise <[email protected]>2024-04-18 17:31:15 +0200
commit93e252c50db8947ea065fb5ea8ad17892ddc37d0 (patch)
tree6e0e95662d7319d439b7629b47686d48679a57f9 /src/zenserver
parentimproved lock file handling (#50) (diff)
downloadzen-de/safer-oplog-import.tar.xz
zen-de/safer-oplog-import.zip
safer oplog import (#52)de/safer-oplog-import
* reference cache gc update capture * When importing oplogs we now import all attachments first and (optionally clean) write the oplog on success
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp79
-rw-r--r--src/zenserver/projectstore/projectstore.h2
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp172
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h1
4 files changed, 194 insertions, 60 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 65d2730d8..e452c658e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -831,6 +831,38 @@ ProjectStore::Oplog::Update(const std::filesystem::path& MarkerPath)
Write();
}
+bool
+ProjectStore::Oplog::Reset()
+{
+ std::filesystem::path MovedDir;
+
+ {
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ m_Storage = {};
+ if (!PrepareDirectoryDelete(m_BasePath, MovedDir))
+ {
+ m_Storage = new OplogStorage(this, m_BasePath);
+ const bool StoreExists = m_Storage->Exists();
+ m_Storage->Open(/* IsCreate */ !StoreExists);
+ return false;
+ }
+ m_ChunkMap.clear();
+ m_MetaMap.clear();
+ m_FileMap.clear();
+ m_OpAddressMap.clear();
+ m_LatestOpMap.clear();
+ m_Storage = new OplogStorage(this, m_BasePath);
+ m_Storage->Open(true);
+ CleanDirectory(m_TempPath);
+ }
+ // Erase content on disk
+ if (!MovedDir.empty())
+ {
+ OplogStorage::Delete(MovedDir);
+ }
+ return true;
+}
+
void
ProjectStore::Oplog::ReplayLog()
{
@@ -1167,8 +1199,19 @@ void
ProjectStore::Oplog::EnableUpdateCapture()
{
m_OplogLock.WithExclusiveLock([&]() {
- m_UpdatedLSNs = std::make_unique<std::vector<int>>();
- m_NonGCAttachments = std::make_unique<std::vector<IoHash>>();
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_UpdatedLSNs);
+ ZEN_ASSERT(!m_NonGCAttachments);
+ m_UpdatedLSNs = std::make_unique<std::vector<int>>();
+ m_NonGCAttachments = std::make_unique<std::vector<IoHash>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_UpdatedLSNs);
+ ZEN_ASSERT(m_NonGCAttachments);
+ }
+ m_UpdateCaptureRefCounter++;
});
}
@@ -1176,8 +1219,15 @@ void
ProjectStore::Oplog::DisableUpdateCapture()
{
m_OplogLock.WithExclusiveLock([&]() {
- m_UpdatedLSNs.reset();
- m_NonGCAttachments.reset();
+ ZEN_ASSERT(m_UpdatedLSNs);
+ ZEN_ASSERT(m_NonGCAttachments);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_UpdatedLSNs.reset();
+ m_NonGCAttachments.reset();
+ }
});
}
@@ -3581,6 +3631,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(RemoteStoreOptions::DefaultMaxChunkEmbedSize);
bool Force = Params["force"sv].AsBool(false);
bool IgnoreMissingAttachments = Params["ignoremissingattachments"sv].AsBool(false);
+ bool CleanOplog = Params["clean"].AsBool(false);
CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
@@ -3594,9 +3645,10 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
JobId JobId = m_JobQueue.QueueJob(
fmt::format("Import oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description),
- [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, Force, IgnoreMissingAttachments](JobContext& Context) {
+ [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, Force, IgnoreMissingAttachments, CleanOplog](
+ JobContext& Context) {
RemoteProjectStore::Result Result =
- LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, &Context);
+ LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context);
auto Response = ConvertResult(Result);
ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
if (!IsHttpSuccessCode(Response.first))
@@ -4258,11 +4310,22 @@ TEST_CASE_TEMPLATE("project.store.export",
ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {});
CHECK(OplogImport != nullptr);
- RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, false, false, nullptr);
+
+ RemoteProjectStore::Result ImportResult =
+ LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, nullptr);
CHECK(ImportResult.ErrorCode == 0);
- RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, *RemoteStore, *OplogImport, true, false, nullptr);
+ RemoteProjectStore::Result ImportForceResult =
+ LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ false, nullptr);
CHECK(ImportForceResult.ErrorCode == 0);
+
+ RemoteProjectStore::Result ImportCleanResult =
+ LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ false, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, nullptr);
+ CHECK(ImportCleanResult.ErrorCode == 0);
+
+ RemoteProjectStore::Result ImportForceCleanResult =
+ LoadOplog(CidStore, *RemoteStore, *OplogImport, /*Force*/ true, /*IgnoreMissingAttachments*/ false, /*CleanOplog*/ true, nullptr);
+ CHECK(ImportForceCleanResult.ErrorCode == 0);
}
TEST_CASE("project.store.gc")
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index e27bf6e49..75844f84e 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -82,6 +82,7 @@ public:
void Read();
void Write();
void Update(const std::filesystem::path& MarkerPath);
+ bool Reset();
struct ChunkInfo
{
@@ -175,6 +176,7 @@ public:
tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file
OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key
+ uint32_t m_UpdateCaptureRefCounter = 0;
std::unique_ptr<std::vector<int>> m_UpdatedLSNs;
std::unique_ptr<std::vector<IoHash>> m_NonGCAttachments;
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index cc9385f5e..65ef099e4 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -2094,13 +2094,13 @@ SaveOplog(CidStore& ChunkStore,
};
RemoteProjectStore::Result
-SaveOplogContainer(ProjectStore::Oplog& Oplog,
- const CbObject& ContainerObject,
- const std::function<bool(const IoHash& RawHash)>& HasAttachment,
- const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
- const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
- const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
- JobContext* OptionalContext)
+ParseOplogContainer(const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ CbObject& OutOplogSection,
+ JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2211,64 +2211,104 @@ SaveOplogContainer(ProjectStore::Oplog& Oplog,
"Section has unexpected data type",
"Failed to save oplog container"};
}
+ OutOplogSection = SectionObject;
+ }
+ return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
+}
- CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
- const uint64_t OpCount = OpsArray.Num();
+static RemoteProjectStore::Result
+WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
+{
+ using namespace std::literals;
- ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
+ Stopwatch Timer;
- const size_t OpsBatchSize = 8192;
- std::vector<uint8_t> OpsData;
- std::vector<size_t> OpDataOffsets;
- size_t OpsCompleteCount = 0;
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ const uint64_t OpCount = OpsArray.Num();
- OpsData.reserve(OpsBatchSize);
+ ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount));
- auto AppendBatch = [&]() {
- std::vector<CbObjectView> Ops;
- Ops.reserve(OpDataOffsets.size());
- for (size_t OpDataOffset : OpDataOffsets)
- {
- Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
- }
- std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
- OpsCompleteCount += OpLsns.size();
- OpsData.clear();
- OpDataOffsets.clear();
- ReportProgress(OptionalContext,
- fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount),
- OpCount,
- OpCount - OpsCompleteCount);
- };
+ const size_t OpsBatchSize = 8192;
+ std::vector<uint8_t> OpsData;
+ std::vector<size_t> OpDataOffsets;
+ size_t OpsCompleteCount = 0;
- BinaryWriter Writer;
- for (CbFieldView OpEntry : OpsArray)
- {
- CbObjectView Op = OpEntry.AsObjectView();
- Op.CopyTo(Writer);
- OpDataOffsets.push_back(OpsData.size());
- OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
- Writer.Reset();
+ OpsData.reserve(OpsBatchSize);
- if (OpDataOffsets.size() == OpsBatchSize)
- {
- AppendBatch();
- }
+ auto AppendBatch = [&]() {
+ std::vector<CbObjectView> Ops;
+ Ops.reserve(OpDataOffsets.size());
+ for (size_t OpDataOffset : OpDataOffsets)
+ {
+ Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset]));
}
- if (!OpDataOffsets.empty())
+ std::vector<uint32_t> OpLsns = Oplog.AppendNewOplogEntries(Ops);
+ OpsCompleteCount += OpLsns.size();
+ OpsData.clear();
+ OpDataOffsets.clear();
+ ReportProgress(OptionalContext,
+ fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount),
+ OpCount,
+ OpCount - OpsCompleteCount);
+ };
+
+ BinaryWriter Writer;
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Op = OpEntry.AsObjectView();
+ Op.CopyTo(Writer);
+ OpDataOffsets.push_back(OpsData.size());
+ OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize());
+ Writer.Reset();
+
+ if (OpDataOffsets.size() == OpsBatchSize)
{
AppendBatch();
}
}
+ if (!OpDataOffsets.empty())
+ {
+ AppendBatch();
+ }
return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0};
}
RemoteProjectStore::Result
+SaveOplogContainer(ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment,
+ const std::function<void(const ChunkedInfo&)>& OnChunkedAttachment,
+ JobContext* OptionalContext)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+ CbObject OplogSection;
+ RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ OplogSection,
+ OptionalContext);
+ if (Result.ErrorCode != 0)
+ {
+ return Result;
+ }
+ Result = WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ return Result;
+}
+
+RemoteProjectStore::Result
LoadOplog(CidStore& ChunkStore,
RemoteProjectStore& RemoteStore,
ProjectStore::Oplog& Oplog,
bool ForceDownload,
bool IgnoreMissingAttachments,
+ bool CleanOplog,
JobContext* OptionalContext)
{
using namespace std::literals;
@@ -2338,6 +2378,7 @@ LoadOplog(CidStore& ChunkStore,
return false;
};
auto OnNeedBlock = [&RemoteStore,
+ &Oplog,
&ChunkStore,
&NetworkWorkerPool,
&AttachmentsWorkLatch,
@@ -2353,6 +2394,9 @@ LoadOplog(CidStore& ChunkStore,
{
return;
}
+
+ Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunks});
+
BlockCountToDownload++;
if (BlockHash == IoHash::Zero)
{
@@ -2495,6 +2539,7 @@ LoadOplog(CidStore& ChunkStore,
};
auto OnNeedAttachment = [&RemoteStore,
+ &Oplog,
&ChunkStore,
&NetworkWorkerPool,
&AttachmentsWorkLatch,
@@ -2515,6 +2560,8 @@ LoadOplog(CidStore& ChunkStore,
return;
}
+ Oplog.CaptureAddedAttachments(std::vector<IoHash>{RawHash});
+
AttachmentsWorkLatch.AddCount(1);
AttachmentCount.fetch_add(1);
NetworkWorkerPool.ScheduleWork([&RemoteStore,
@@ -2571,6 +2618,7 @@ LoadOplog(CidStore& ChunkStore,
std::vector<ChunkedInfo> FilesToDechunk;
auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) {
+ Oplog.CaptureAddedAttachments(std::vector<IoHash>{Chunked.RawHash});
if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash))
{
Oplog.CaptureAddedAttachments(Chunked.ChunkHashes);
@@ -2578,19 +2626,21 @@ LoadOplog(CidStore& ChunkStore,
}
};
- RemoteProjectStore::Result Result = SaveOplogContainer(Oplog,
- LoadContainerResult.ContainerObject,
- HasAttachment,
- OnNeedBlock,
- OnNeedAttachment,
- OnChunkedAttachment,
- OptionalContext);
+ CbObject OplogSection;
+ RemoteProjectStore::Result Result = // SaveOplogContainer(Oplog,
+ ParseOplogContainer(LoadContainerResult.ContainerObject,
+ HasAttachment,
+ OnNeedBlock,
+ OnNeedAttachment,
+ OnChunkedAttachment,
+ OplogSection,
+ OptionalContext);
if (Result.ErrorCode != 0)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
ReportMessage(OptionalContext,
- fmt::format("Wrote oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
+ fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
Attachments.size(),
BlockCountToDownload,
@@ -2741,6 +2791,24 @@ LoadOplog(CidStore& ChunkStore,
Result = RemoteResult.ConvertResult();
}
+ if (Result.ErrorCode == 0)
+ {
+ if (CleanOplog)
+ {
+ if (!Oplog.Reset())
+ {
+ Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError),
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
+ .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())};
+ ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
+ }
+ }
+ if (Result.ErrorCode == 0)
+ {
+ WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ }
+ }
+
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
ReportRemoteStoreStats(OptionalContext, RemoteStoreInfo, RemoteStore.GetStats(), TransferWallTimeMS);
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 6b83e526c..b00aa231f 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -158,6 +158,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
ProjectStore::Oplog& Oplog,
bool ForceDownload,
bool IgnoreMissingAttachments,
+ bool CleanOplog,
JobContext* OptionalContext);
CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks);