aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-05 18:53:44 -0400
committerGitHub <[email protected]>2023-09-06 00:53:44 +0200
commit832a1b464633ec7a31a8aad386520e1990d0b6cb (patch)
treea07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src
parentretry file create (#383) (diff)
downloadzen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz
zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file * rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag * changelog * log number of attachments to download * add delay on jupiter request failure when retrying * make sure we upload all attachments even if Needs are empty when ForceUpload is true release TempAttachment as soon as it is used * sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src')
-rw-r--r--src/zencore/filesystem.cpp2
-rw-r--r--src/zencore/include/zencore/iobuffer.h12
-rw-r--r--src/zencore/iobuffer.cpp29
-rw-r--r--src/zenhttp/httpshared.cpp3
-rw-r--r--src/zenserver/projectstore/httpprojectstore.cpp1
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp42
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.h3
-rw-r--r--src/zenserver/projectstore/projectstore.cpp15
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp131
-rw-r--r--src/zenserver/upstream/jupiter.cpp164
-rw-r--r--src/zenserver/upstream/jupiter.h8
-rw-r--r--src/zenstore/blockstore.cpp6
-rw-r--r--src/zenstore/filecas.cpp96
13 files changed, 297 insertions, 215 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index e17d83895..3311ba1b9 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -758,7 +758,7 @@ ReadFile(std::filesystem::path Path)
#endif
FileContents Contents;
- Contents.Data.emplace_back(IoBuffer(IoBuffer::File, Handle, 0, FileSizeBytes));
+ Contents.Data.emplace_back(IoBuffer(IoBuffer::File, Handle, 0, FileSizeBytes, /*IsWholeFile*/ true));
return Contents;
}
diff --git a/src/zencore/include/zencore/iobuffer.h b/src/zencore/include/zencore/iobuffer.h
index bbc346f9b..fef78741f 100644
--- a/src/zencore/include/zencore/iobuffer.h
+++ b/src/zencore/include/zencore/iobuffer.h
@@ -270,20 +270,20 @@ struct IoBufferExtendedCore : public IoBufferCore
enum ExtendedFlags
{
- kOwnsFile = 1 << 16,
- kOwnsMmap = 1 << 17
+ kOwnsFile = 1 << 16,
+ kOwnsMmap = 1 << 17,
+ kDeleteOnClose = 1 << 18
};
void Materialize() const;
bool GetFileReference(IoBufferFileReference& OutRef) const;
- void MarkAsDeleteOnClose();
+ void SetDeleteOnClose(bool DeleteOnClose);
private:
void* m_FileHandle = nullptr;
uint64_t m_FileOffset = 0;
mutable void* m_MmapHandle = nullptr;
mutable void* m_MappedPointer = nullptr;
- bool m_DeleteOnClose = false;
};
inline IoBufferExtendedCore*
@@ -362,7 +362,7 @@ public:
memcpy(const_cast<void*>(m_Core->DataPointer()), DataPtr, SizeBytes);
}
- ZENCORE_API IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize);
+ ZENCORE_API IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize, bool IsWholeFile);
ZENCORE_API IoBuffer(EBorrowedFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize);
inline explicit operator bool() const { return !m_Core->IsNull(); }
@@ -379,7 +379,7 @@ public:
inline void SetContentType(ZenContentType ContentType) { m_Core->SetContentType(ContentType); }
[[nodiscard]] inline ZenContentType GetContentType() const { return m_Core->GetContentType(); }
[[nodiscard]] ZENCORE_API bool GetFileReference(IoBufferFileReference& OutRef) const;
- void MarkAsDeleteOnClose();
+ void SetDeleteOnClose(bool DeleteOnClose);
inline MemoryView GetView() const { return MemoryView(m_Core->DataPointer(), m_Core->DataBytes()); }
inline MutableMemoryView GetMutableView() { return MutableMemoryView(m_Core->MutableDataPointer(), m_Core->DataBytes()); }
diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp
index 22bc61395..efec06f7f 100644
--- a/src/zencore/iobuffer.cpp
+++ b/src/zencore/iobuffer.cpp
@@ -217,7 +217,7 @@ IoBufferExtendedCore::~IoBufferExtendedCore()
if (LocalFlags & kOwnsFile)
{
- if (m_DeleteOnClose)
+ if (LocalFlags & kDeleteOnClose)
{
#if ZEN_PLATFORM_WINDOWS
// Mark file for deletion when final handle is closed
@@ -438,9 +438,16 @@ IoBufferExtendedCore::GetFileReference(IoBufferFileReference& OutRef) const
}
void
-IoBufferExtendedCore::MarkAsDeleteOnClose()
+IoBufferExtendedCore::SetDeleteOnClose(bool DeleteOnClose)
{
- m_DeleteOnClose = true;
+ if (DeleteOnClose && (m_Flags & kOwnsFile))
+ {
+ m_Flags.fetch_or(kDeleteOnClose, std::memory_order_release);
+ }
+ else
+ {
+ m_Flags.fetch_and(~static_cast<uint32_t>(kDeleteOnClose), std::memory_order_release);
+ }
}
//////////////////////////////////////////////////////////////////////////
@@ -475,9 +482,10 @@ IoBuffer::IoBuffer(const IoBuffer& OuterBuffer, size_t Offset, size_t Size)
}
}
-IoBuffer::IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize)
+IoBuffer::IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize, bool IsWholeFile)
: m_Core(new IoBufferExtendedCore(FileHandle, ChunkFileOffset, ChunkSize, /* owned */ true))
{
+ m_Core->SetIsWholeFile(IsWholeFile);
}
IoBuffer::IoBuffer(EBorrowedFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize)
@@ -506,11 +514,11 @@ IoBuffer::GetFileReference(IoBufferFileReference& OutRef) const
}
void
-IoBuffer::MarkAsDeleteOnClose()
+IoBuffer::SetDeleteOnClose(bool DeleteOnClose)
{
if (IoBufferExtendedCore* ExtCore = m_Core->ExtendedCore())
{
- ExtCore->MarkAsDeleteOnClose();
+ ExtCore->SetDeleteOnClose(DeleteOnClose);
}
}
@@ -615,9 +623,7 @@ IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Of
#if ZEN_PLATFORM_WINDOWS
void* Fd = DataFile.Detach();
#endif
- IoBuffer Iob(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size);
- Iob.m_Core->SetIsWholeFile(Offset == 0 && Size == FileSize);
- return Iob;
+ return IoBuffer(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size, Offset == 0 && Size == FileSize);
}
#if !ZEN_PLATFORM_WINDOWS
@@ -666,10 +672,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName)
Handle = (void*)uintptr_t(Fd);
#endif // ZEN_PLATFORM_WINDOWS
- IoBuffer Iob(IoBuffer::File, Handle, 0, FileSize);
- Iob.m_Core->SetIsWholeFile(true);
-
- return Iob;
+ return IoBuffer(IoBuffer::File, Handle, 0, FileSize, /*IsWholeFile*/ true);
}
IoHash
diff --git a/src/zenhttp/httpshared.cpp b/src/zenhttp/httpshared.cpp
index 4a67b69e7..9ee5dc0fb 100644
--- a/src/zenhttp/httpshared.cpp
+++ b/src/zenhttp/httpshared.cpp
@@ -417,7 +417,8 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
liFileSize.LowPart = ::GetFileSize(FileHandle, &liFileSize.HighPart);
if (liFileSize.LowPart != INVALID_FILE_SIZE)
{
- FullFileBuffer = IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart));
+ FullFileBuffer =
+ IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart), /*IsWholeFile*/ true);
PartialFileBuffers.insert_or_assign(Path.string(), FullFileBuffer);
}
}
diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp
index d28d49d1f..ff7a5f8b9 100644
--- a/src/zenserver/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/projectstore/httpprojectstore.cpp
@@ -973,6 +973,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req)
std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString();
if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath))
{
+ Data.SetDeleteOnClose(true);
return SharedBuffer(std::move(Data));
}
else
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index 4be58256c..2bfa6851b 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -15,16 +15,18 @@ using namespace std::literals;
class JupiterRemoteStore : public RemoteProjectStore
{
public:
- JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& Key,
- bool ForceDisableBlocks,
- bool ForceDisableTempBlocks)
+ JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& Key,
+ bool ForceDisableBlocks,
+ bool ForceDisableTempBlocks,
+ const std::filesystem::path& TempFilePath)
: m_CloudClient(std::move(CloudClient))
, m_Namespace(Namespace)
, m_Bucket(Bucket)
, m_Key(Key)
+ , m_TempFilePath(TempFilePath)
{
if (ForceDisableBlocks)
{
@@ -52,6 +54,10 @@ public:
for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++)
{
PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
+ if (!PutResult.Success)
+ {
+ Sleep(100 * (Attempt + 1));
+ }
}
}
@@ -77,6 +83,10 @@ public:
for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++)
{
PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
+ if (!PutResult.Success)
+ {
+ Sleep(100 * (Attempt + 1));
+ }
}
}
@@ -116,6 +126,10 @@ public:
for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeResult.Success; Attempt++)
{
FinalizeResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
+ if (!FinalizeResult.Success)
+ {
+ Sleep(100 * (Attempt + 1));
+ }
}
}
Result Result{ConvertResult(FinalizeResult)};
@@ -140,6 +154,10 @@ public:
for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++)
{
GetResult = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject);
+ if (!GetResult.Success)
+ {
+ Sleep(100 * (Attempt + 1));
+ }
}
}
@@ -179,7 +197,11 @@ public:
CloudCacheSession Session(m_CloudClient.Get());
for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++)
{
- GetResult = Session.GetCompressedBlob(m_Namespace, RawHash);
+ GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
+ if (!GetResult.Success)
+ {
+ Sleep(100 * (Attempt + 1));
+ }
}
}
LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
@@ -236,12 +258,13 @@ private:
const std::string m_Namespace;
const std::string m_Bucket;
const IoHash m_Key;
+ std::filesystem::path m_TempFilePath;
bool m_EnableBlocks = true;
bool m_UseTempBlocks = true;
};
std::unique_ptr<RemoteProjectStore>
-CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options)
+CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
{
std::string Url = Options.Url;
if (Url.find("://"sv) == std::string::npos)
@@ -281,7 +304,8 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options)
Options.Bucket,
Options.Key,
Options.ForceDisableBlocks,
- Options.ForceDisableTempBlocks);
+ Options.ForceDisableTempBlocks,
+ TempFilePath);
return RemoteStore;
}
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h
index f2375a730..d6fced91b 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.h
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h
@@ -22,6 +22,7 @@ struct JupiterRemoteStoreOptions : RemoteStoreOptions
bool AssumeHttp2 = false;
};
-std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options);
+std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options,
+ const std::filesystem::path& TempFilePath);
} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index e42afdb67..7754f61cd 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -72,10 +72,11 @@ namespace {
} while (true);
}
- std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
- AuthMgr& AuthManager,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize)
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ const std::filesystem::path& TempFilePath)
{
using namespace std::literals;
@@ -162,7 +163,7 @@ namespace {
ForceDisableBlocks,
ForceDisableTempBlocks,
AssumeHttp2};
- RemoteStore = CreateJupiterRemoteStore(Options);
+ RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath);
}
if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
@@ -2700,7 +2701,7 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
if (RemoteStoreResult.first == nullptr)
{
@@ -2742,7 +2743,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
bool Force = Params["force"sv].AsBool(false);
std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
- CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath());
if (RemoteStoreResult.first == nullptr)
{
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index bbf3a9f32..f10d7da63 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -298,9 +298,9 @@ BuildContainer(CidStore& ChunkStore,
Offset += Buffer.GetSize();
}
void* FileHandle = BlockFile.Detach();
- AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+ AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
- AttachmentBuffer.MarkAsDeleteOnClose();
+ AttachmentBuffer.SetDeleteOnClose(true);
ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize));
}
OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer);
@@ -389,13 +389,24 @@ BuildContainer(CidStore& ChunkStore,
return ChunkStore.FindChunkByCid(AttachmentHash);
};
- for (const auto& It : Attachments)
+ // Sort attachments so we get predictable blocks for the same oplog upload
+ std::vector<IoHash> SortedAttachments;
+ SortedAttachments.reserve(Attachments.size());
+ for (const auto It : Attachments)
{
- const IoHash& AttachmentHash(It.first);
- IoBuffer Payload = GetPayload(AttachmentHash);
+ SortedAttachments.push_back(It.first);
+ }
+ std::sort(SortedAttachments.begin(), SortedAttachments.end());
+
+ for (const IoHash& AttachmentHash : SortedAttachments)
+ {
+ IoBuffer Payload = GetPayload(AttachmentHash);
if (!Payload)
{
- std::optional<CbObject> Op = Oplog.GetOpByIndex(It.second);
+ auto It = Attachments.find(AttachmentHash);
+ ZEN_ASSERT(It != Attachments.end());
+
+ std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second);
ZEN_ASSERT(Op.has_value());
ExtendableStringBuilder<1024> Sb;
Sb.Append("Failed to find attachment '");
@@ -672,7 +683,7 @@ SaveOplog(CidStore& ChunkStore,
Offset += Buffer.GetSize();
}
void* FileHandle = BlockFile.Detach();
- BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+ BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true);
}
catch (std::exception& Ex)
{
@@ -682,7 +693,7 @@ SaveOplog(CidStore& ChunkStore,
return;
}
- BlockBuffer.MarkAsDeleteOnClose();
+ BlockBuffer.SetDeleteOnClose(true);
{
RwLock::ExclusiveLockScope __(AttachmentsLock);
CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
@@ -757,7 +768,7 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)));
}
- if (!ContainerSaveResult.Needs.empty())
+ if (!ContainerSaveResult.Needs.empty() || ForceUpload)
{
ZEN_INFO("Filtering needed attachments...");
std::vector<IoHash> NeededLargeAttachments;
@@ -766,6 +777,7 @@ SaveOplog(CidStore& ChunkStore,
NeededOtherAttachments.reserve(CreatedBlocks.size());
if (ForceUpload)
{
+ // TODO: Check ForceUpload - it should add all attachments and blocks regardless if Needs is empty or not
NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end());
}
else
@@ -791,59 +803,63 @@ SaveOplog(CidStore& ChunkStore,
{
break;
}
- SaveAttachmentsLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, &TempAttachments]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = std::move(BlockIt->second);
+ }
+ else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
+ {
+ Payload = LooseTmpFileIt->second;
+ TempAttachments.erase(LooseTmpFileIt);
+ }
- IoBuffer Payload;
- if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
- {
- Payload = std::move(BlockIt->second);
- }
- else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
- {
- Payload = std::move(LooseTmpFileIt->second);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
- RemoteResult.GetErrorReason(),
- RemoteResult.GetError());
- return;
- }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&ChunkStore,
+ &RemoteStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ RawHash,
+ &CreatedBlocks,
+ TempPayload = std::move(Payload)]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash);
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
+ RemoteResult.GetErrorReason(),
+ RemoteResult.GetError());
+ return;
+ }
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
- RawHash,
- NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
- return;
- }
- ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
RawHash,
NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
return;
- });
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
}
}
+ TempAttachments.clear();
if (!CreatedBlocks.empty())
{
@@ -967,7 +983,6 @@ SaveOplog(CidStore& ChunkStore,
ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
}
SaveAttachmentsLatch.Wait();
- TempAttachments.clear();
}
if (!RemoteResult.IsError())
@@ -1224,6 +1239,10 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O
RemoteProjectStore::Result Result =
SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+ if (!Attachments.empty())
+ {
+ ZEN_INFO("Found {} attachments to download", Attachments.size());
+ }
AttachmentsWorkLatch.CountDown();
while (!AttachmentsWorkLatch.Wait(1000))
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index 2137523b2..61d8a85cc 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -6,12 +6,14 @@
#include <zencore/compactbinary.h>
#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
-#include <zencore/string.h>
+#include <zencore/scopeguard.h>
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zenhttp/formatters.h>
+#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
@@ -79,7 +81,7 @@ namespace detail {
{
return {.ElapsedSeconds = Response.elapsed,
.ErrorCode = static_cast<int32_t>(Response.status_code),
- .Reason = Response.reason,
+ .Reason = Response.reason.empty() ? Response.text : Response.reason,
.Success = false};
}
return {.Bytes = Response.downloaded_bytes,
@@ -89,6 +91,72 @@ namespace detail {
.Success = true};
}
+ cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer)
+ {
+ if (TempFolderPath.empty())
+ {
+ return Session.Get();
+ }
+
+ std::string PayloadString;
+ std::shared_ptr<BasicFile> PayloadFile;
+
+ auto _ = MakeGuard([&]() {
+ if (PayloadFile)
+ {
+ PayloadFile.reset();
+ std::filesystem::path TempPath = TempFolderPath / Name;
+ std::error_code Ec;
+ std::filesystem::remove(TempPath, Ec);
+ }
+ });
+
+ uint64_t Offset = 0;
+ Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) {
+ if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
+ {
+ std::filesystem::path TempPath = TempFolderPath / Name;
+ PayloadFile = std::make_shared<BasicFile>();
+ PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete);
+ PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset);
+ Offset += PayloadString.size();
+ PayloadString.clear();
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->Write(data.data(), data.size(), Offset);
+ Offset += data.size();
+ }
+ else
+ {
+ PayloadString.append(data);
+ }
+ return true;
+ }});
+
+ cpr::Response Response = Session.Get();
+
+ if (!Response.error && IsHttpSuccessCode(Response.status_code))
+ {
+ if (PayloadFile)
+ {
+ uint64_t PayloadSize = PayloadFile->FileSize();
+ void* FileHandle = PayloadFile->Detach();
+ PayloadFile.reset();
+ OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true);
+ OutBuffer.SetDeleteOnClose(true);
+ }
+ else
+ {
+ OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size());
+ }
+ return Response;
+ }
+
+ Response.text.swap(PayloadString);
+ return Response;
+ }
+
} // namespace detail
CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
@@ -145,8 +213,8 @@ CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId,
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -194,8 +262,8 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -211,12 +279,13 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key)
+CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
{
ZEN_TRACE_CPU("HordeClient::GetCompressedBlob");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
+ std::string KeyString = Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString;
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -225,13 +294,14 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
Session.SetOption(cpr::Body{});
- cpr::Response Response = Session.Get();
+ IoBuffer Payload;
+ cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
ZEN_DEBUG("GET {}", Response);
CloudCacheResult Result = detail::ConvertResponse(Response);
if (Result.Success)
{
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ Result.Response = std::move(Payload);
}
else
{
@@ -245,8 +315,8 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -262,12 +332,17 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
}
CloudCacheResult
-CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash)
+CloudCacheSession::GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath)
{
ZEN_TRACE_CPU("HordeClient::GetInlineBlob");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
+ std::string KeyString = Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString;
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -276,13 +351,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}});
Session.SetOption(cpr::Body{});
- cpr::Response Response = Session.Get();
+ IoBuffer Payload;
+ cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
ZEN_DEBUG("GET {}", Response);
CloudCacheResult Result = detail::ConvertResponse(Response);
if (Result.Success)
{
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ Result.Response = std::move(Payload);
}
else
{
@@ -296,8 +372,8 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -357,8 +433,8 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -426,8 +502,8 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -495,8 +571,8 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -546,8 +622,8 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -612,8 +688,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -670,8 +746,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -719,8 +795,8 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -766,8 +842,8 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -822,8 +898,8 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash&
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -906,8 +982,8 @@ CloudCacheSession::PostComputeTasks(IoBuffer TasksData)
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -959,8 +1035,8 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -1031,8 +1107,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -1096,8 +1172,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
index cabb61a1f..4e7d11fd9 100644
--- a/src/zenserver/upstream/jupiter.h
+++ b/src/zenserver/upstream/jupiter.h
@@ -98,9 +98,13 @@ public:
CloudCacheResult Authenticate();
CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash);
+ CloudCacheResult GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath = {});
PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index abf77f8a6..cdd7abae7 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -60,7 +60,7 @@ BlockStoreFile::Open()
return true;
});
void* FileHandle = m_File.Handle();
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize());
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize(), /*IsWholeFile*/ true);
}
void
@@ -88,7 +88,7 @@ BlockStoreFile::Create(uint64_t InitialSize)
// We map our m_IoBuffer beyond the file size as we will grow it over time and want
// to be able to create sub-buffers of all the written range later
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize, false);
}
uint64_t
@@ -100,7 +100,7 @@ BlockStoreFile::FileSize()
void
BlockStoreFile::MarkAsDeleteOnClose()
{
- m_IoBuffer.MarkAsDeleteOnClose();
+ m_IoBuffer.SetDeleteOnClose(true);
}
IoBuffer
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index f641b899e..56a840701 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -221,27 +221,6 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
}
}
-#if ZEN_PLATFORM_WINDOWS
-static void
-DeletePayloadFileOnClose(const void* FileHandle)
-{
- const HANDLE WinFileHandle = (const HANDLE)FileHandle;
- // This will cause the file to be deleted when the last handle to it is closed
- FILE_DISPOSITION_INFO Fdi{};
- Fdi.DeleteFile = TRUE;
- BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
-
- if (!Success)
- {
- // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
- // to delete it.
- ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'",
- PathFromHandle(WinFileHandle),
- GetLastErrorAsString());
- }
-}
-#endif
-
CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
{
@@ -269,7 +248,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
// place in the file store directory, thus avoiding unnecessary copying
IoBufferFileReference FileRef;
- if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
+ bool IsWholeFile = Chunk.IsWholeFile();
+ if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef))
{
{
bool Exists = true;
@@ -279,21 +259,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
}
if (Exists)
{
-#if ZEN_PLATFORM_WINDOWS
- DeletePayloadFileOnClose(FileRef.FileHandle);
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle);
- if (unlink(FilePath.c_str()) < 0)
- {
- int UnlinkError = zen::GetLastError();
- if (UnlinkError != ENOENT)
- {
- ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
- FilePath.string(),
- GetSystemErrorAsString(UnlinkError));
- }
- }
-#endif
return CasStore::InsertResult{.New = false};
}
}
@@ -330,8 +295,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
}
- DeletePayloadFileOnClose(ChunkFileHandle);
-
return CasStore::InsertResult{.New = IsNew};
}
else
@@ -429,6 +392,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
if (Success)
{
m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+ Chunk.SetDeleteOnClose(false);
HashLock.ReleaseNow();
@@ -441,7 +405,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
{
m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
}
-
return CasStore::InsertResult{.New = IsNew};
}
@@ -450,7 +413,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
{
HashLock.ReleaseNow();
- DeletePayloadFileOnClose(ChunkFileHandle);
bool IsNew = false;
{
@@ -464,40 +426,44 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
return CasStore::InsertResult{.New = IsNew};
}
-
ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
GetSystemErrorAsString(LastError),
ChunkHash);
- DeletePayloadFileOnClose(ChunkFileHandle);
-
#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+
std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
- std::filesystem::path DestPath = Name.ShardedPath.c_str();
- int Ret = link(SourcePath.c_str(), DestPath.c_str());
+ std::filesystem::path DestPath = Name.ShardedPath.c_str();
+ int Ret = link(SourcePath.c_str(), DestPath.c_str());
if (Ret < 0 && zen::GetLastError() == ENOENT)
{
// Destination directory doesn't exist. Create it any try again.
CreateDirectories(DestPath.parent_path().c_str());
Ret = link(SourcePath.c_str(), DestPath.c_str());
}
- int LinkError = zen::GetLastError();
-
- if (unlink(SourcePath.c_str()) < 0)
+ if (Ret == 0)
{
- int UnlinkError = zen::GetLastError();
- if (UnlinkError != ENOENT)
+ m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+ Chunk.SetDeleteOnClose(false);
+
+ HashLock.ReleaseNow();
+ bool IsNew = false;
{
- ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
- SourcePath.string(),
- GetSystemErrorAsString(UnlinkError));
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
}
+ return CasStore::InsertResult{.New = IsNew};
}
-
- // It is possible that someone beat us to it in linking the file. In that
- // case a "file exists" error is okay. All others are not.
- if (Ret < 0)
+ else
{
+ int LinkError = zen::GetLastError();
+
+ // It is possible that someone beat us to it in linking the file. In that
+ // case a "file exists" error is okay. All others are not.
if (LinkError == EEXIST)
{
HashLock.ReleaseNow();
@@ -517,20 +483,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
GetSystemErrorAsString(LinkError),
ChunkHash);
}
- else
- {
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
#endif // ZEN_PLATFORM_*
}