diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-05 18:53:44 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-06 00:53:44 +0200 |
| commit | 832a1b464633ec7a31a8aad386520e1990d0b6cb (patch) | |
| tree | a07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src | |
| parent | retry file create (#383) (diff) | |
| download | zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip | |
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file
* rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag
* changelog
* log number of attachments to download
* add delay on jupiter request failure when retrying
* make sure we upload all attachments even if Needs are empty when ForceUpload is true
release TempAttachment as soon as it is used
* sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/filesystem.cpp | 2 | ||||
| -rw-r--r-- | src/zencore/include/zencore/iobuffer.h | 12 | ||||
| -rw-r--r-- | src/zencore/iobuffer.cpp | 29 | ||||
| -rw-r--r-- | src/zenhttp/httpshared.cpp | 3 | ||||
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 42 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.h | 3 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 15 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 131 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 164 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.h | 8 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 96 |
13 files changed, 297 insertions, 215 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index e17d83895..3311ba1b9 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -758,7 +758,7 @@ ReadFile(std::filesystem::path Path) #endif FileContents Contents; - Contents.Data.emplace_back(IoBuffer(IoBuffer::File, Handle, 0, FileSizeBytes)); + Contents.Data.emplace_back(IoBuffer(IoBuffer::File, Handle, 0, FileSizeBytes, /*IsWholeFile*/ true)); return Contents; } diff --git a/src/zencore/include/zencore/iobuffer.h b/src/zencore/include/zencore/iobuffer.h index bbc346f9b..fef78741f 100644 --- a/src/zencore/include/zencore/iobuffer.h +++ b/src/zencore/include/zencore/iobuffer.h @@ -270,20 +270,20 @@ struct IoBufferExtendedCore : public IoBufferCore enum ExtendedFlags { - kOwnsFile = 1 << 16, - kOwnsMmap = 1 << 17 + kOwnsFile = 1 << 16, + kOwnsMmap = 1 << 17, + kDeleteOnClose = 1 << 18 }; void Materialize() const; bool GetFileReference(IoBufferFileReference& OutRef) const; - void MarkAsDeleteOnClose(); + void SetDeleteOnClose(bool DeleteOnClose); private: void* m_FileHandle = nullptr; uint64_t m_FileOffset = 0; mutable void* m_MmapHandle = nullptr; mutable void* m_MappedPointer = nullptr; - bool m_DeleteOnClose = false; }; inline IoBufferExtendedCore* @@ -362,7 +362,7 @@ public: memcpy(const_cast<void*>(m_Core->DataPointer()), DataPtr, SizeBytes); } - ZENCORE_API IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize); + ZENCORE_API IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize, bool IsWholeFile); ZENCORE_API IoBuffer(EBorrowedFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize); inline explicit operator bool() const { return !m_Core->IsNull(); } @@ -379,7 +379,7 @@ public: inline void SetContentType(ZenContentType ContentType) { m_Core->SetContentType(ContentType); } [[nodiscard]] inline ZenContentType GetContentType() const { return m_Core->GetContentType(); } [[nodiscard]] ZENCORE_API bool GetFileReference(IoBufferFileReference& OutRef) const; - void MarkAsDeleteOnClose(); + void SetDeleteOnClose(bool DeleteOnClose); inline MemoryView GetView() const { return MemoryView(m_Core->DataPointer(), m_Core->DataBytes()); } inline MutableMemoryView GetMutableView() { return MutableMemoryView(m_Core->MutableDataPointer(), m_Core->DataBytes()); } diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp index 22bc61395..efec06f7f 100644 --- a/src/zencore/iobuffer.cpp +++ b/src/zencore/iobuffer.cpp @@ -217,7 +217,7 @@ IoBufferExtendedCore::~IoBufferExtendedCore() if (LocalFlags & kOwnsFile) { - if (m_DeleteOnClose) + if (LocalFlags & kDeleteOnClose) { #if ZEN_PLATFORM_WINDOWS // Mark file for deletion when final handle is closed @@ -438,9 +438,16 @@ IoBufferExtendedCore::GetFileReference(IoBufferFileReference& OutRef) const } void -IoBufferExtendedCore::MarkAsDeleteOnClose() +IoBufferExtendedCore::SetDeleteOnClose(bool DeleteOnClose) { - m_DeleteOnClose = true; + if (DeleteOnClose && (m_Flags & kOwnsFile)) + { + m_Flags.fetch_or(kDeleteOnClose, std::memory_order_release); + } + else + { + m_Flags.fetch_and(~static_cast<uint32_t>(kDeleteOnClose), std::memory_order_release); + } } ////////////////////////////////////////////////////////////////////////// @@ -475,9 +482,10 @@ IoBuffer::IoBuffer(const IoBuffer& OuterBuffer, size_t Offset, size_t Size) } } -IoBuffer::IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize) +IoBuffer::IoBuffer(EFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize, bool IsWholeFile) : m_Core(new IoBufferExtendedCore(FileHandle, ChunkFileOffset, ChunkSize, /* owned */ true)) { + m_Core->SetIsWholeFile(IsWholeFile); } IoBuffer::IoBuffer(EBorrowedFileTag, void* FileHandle, uint64_t ChunkFileOffset, uint64_t ChunkSize) @@ -506,11 +514,11 @@ IoBuffer::GetFileReference(IoBufferFileReference& OutRef) const } void -IoBuffer::MarkAsDeleteOnClose() +IoBuffer::SetDeleteOnClose(bool DeleteOnClose) { if (IoBufferExtendedCore* ExtCore = m_Core->ExtendedCore()) { - ExtCore->MarkAsDeleteOnClose(); + ExtCore->SetDeleteOnClose(DeleteOnClose); } } @@ -615,9 +623,7 @@ IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Of #if ZEN_PLATFORM_WINDOWS void* Fd = DataFile.Detach(); #endif - IoBuffer Iob(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size); - Iob.m_Core->SetIsWholeFile(Offset == 0 && Size == FileSize); - return Iob; + return IoBuffer(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size, Offset == 0 && Size == FileSize); } #if !ZEN_PLATFORM_WINDOWS @@ -666,10 +672,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) Handle = (void*)uintptr_t(Fd); #endif // ZEN_PLATFORM_WINDOWS - IoBuffer Iob(IoBuffer::File, Handle, 0, FileSize); - Iob.m_Core->SetIsWholeFile(true); - - return Iob; + return IoBuffer(IoBuffer::File, Handle, 0, FileSize, /*IsWholeFile*/ true); } IoHash diff --git a/src/zenhttp/httpshared.cpp b/src/zenhttp/httpshared.cpp index 4a67b69e7..9ee5dc0fb 100644 --- a/src/zenhttp/httpshared.cpp +++ b/src/zenhttp/httpshared.cpp @@ -417,7 +417,8 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint liFileSize.LowPart = ::GetFileSize(FileHandle, &liFileSize.HighPart); if (liFileSize.LowPart != INVALID_FILE_SIZE) { - FullFileBuffer = IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart)); + FullFileBuffer = + IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart), /*IsWholeFile*/ true); PartialFileBuffers.insert_or_assign(Path.string(), FullFileBuffer); } } diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index d28d49d1f..ff7a5f8b9 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -973,6 +973,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) std::filesystem::path AttachmentPath = Oplog.TempPath() / AttachmentId.ToHexString(); if (IoBuffer Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath)) { + Data.SetDeleteOnClose(true); return SharedBuffer(std::move(Data)); } else diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index 4be58256c..2bfa6851b 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -15,16 +15,18 @@ using namespace std::literals; class JupiterRemoteStore : public RemoteProjectStore { public: - JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient, - std::string_view Namespace, - std::string_view Bucket, - const IoHash& Key, - bool ForceDisableBlocks, - bool ForceDisableTempBlocks) + JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& Key, + bool ForceDisableBlocks, + bool ForceDisableTempBlocks, + const std::filesystem::path& TempFilePath) : m_CloudClient(std::move(CloudClient)) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_Key(Key) + , m_TempFilePath(TempFilePath) { if (ForceDisableBlocks) { @@ -52,6 +54,10 @@ public: for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++) { PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); + if (!PutResult.Success) + { + Sleep(100 * (Attempt + 1)); + } } } @@ -77,6 +83,10 @@ public: for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++) { PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); + if (!PutResult.Success) + { + Sleep(100 * (Attempt + 1)); + } } } @@ -116,6 +126,10 @@ public: for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeResult.Success; Attempt++) { FinalizeResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + if (!FinalizeResult.Success) + { + Sleep(100 * (Attempt + 1)); + } } } Result Result{ConvertResult(FinalizeResult)}; @@ -140,6 +154,10 @@ public: for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++) { GetResult = Session.GetRef(m_Namespace, m_Bucket, m_Key, ZenContentType::kCbObject); + if (!GetResult.Success) + { + Sleep(100 * (Attempt + 1)); + } } } @@ -179,7 +197,11 @@ public: CloudCacheSession Session(m_CloudClient.Get()); for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++) { - GetResult = Session.GetCompressedBlob(m_Namespace, RawHash); + GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); + if (!GetResult.Success) + { + Sleep(100 * (Attempt + 1)); + } } } LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; @@ -236,12 +258,13 @@ private: const std::string m_Namespace; const std::string m_Bucket; const IoHash m_Key; + std::filesystem::path m_TempFilePath; bool m_EnableBlocks = true; bool m_UseTempBlocks = true; }; std::unique_ptr<RemoteProjectStore> -CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options) +CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) @@ -281,7 +304,8 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options) Options.Bucket, Options.Key, Options.ForceDisableBlocks, - Options.ForceDisableTempBlocks); + Options.ForceDisableTempBlocks, + TempFilePath); return RemoteStore; } diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h index f2375a730..d6fced91b 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.h +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h @@ -22,6 +22,7 @@ struct JupiterRemoteStoreOptions : RemoteStoreOptions bool AssumeHttp2 = false; }; -std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options); +std::unique_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, + const std::filesystem::path& TempFilePath); } // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index e42afdb67..7754f61cd 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -72,10 +72,11 @@ namespace { } while (true); } - std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, - AuthMgr& AuthManager, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize) + std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params, + AuthMgr& AuthManager, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + const std::filesystem::path& TempFilePath) { using namespace std::literals; @@ -162,7 +163,7 @@ namespace { ForceDisableBlocks, ForceDisableTempBlocks, AssumeHttp2}; - RemoteStore = CreateJupiterRemoteStore(Options); + RemoteStore = CreateJupiterRemoteStore(Options, TempFilePath); } if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) @@ -2700,7 +2701,7 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); if (RemoteStoreResult.first == nullptr) { @@ -2742,7 +2743,7 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, bool Force = Params["force"sv].AsBool(false); std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = - CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); + CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize, Oplog.TempPath()); if (RemoteStoreResult.first == nullptr) { diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index bbf3a9f32..f10d7da63 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -298,9 +298,9 @@ BuildContainer(CidStore& ChunkStore, Offset += Buffer.GetSize(); } void* FileHandle = BlockFile.Detach(); - AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); - AttachmentBuffer.MarkAsDeleteOnClose(); + AttachmentBuffer.SetDeleteOnClose(true); ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize)); } OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer); @@ -389,13 +389,24 @@ BuildContainer(CidStore& ChunkStore, return ChunkStore.FindChunkByCid(AttachmentHash); }; - for (const auto& It : Attachments) + // Sort attachments so we get predictable blocks for the same oplog upload + std::vector<IoHash> SortedAttachments; + SortedAttachments.reserve(Attachments.size()); + for (const auto It : Attachments) { - const IoHash& AttachmentHash(It.first); - IoBuffer Payload = GetPayload(AttachmentHash); + SortedAttachments.push_back(It.first); + } + std::sort(SortedAttachments.begin(), SortedAttachments.end()); + + for (const IoHash& AttachmentHash : SortedAttachments) + { + IoBuffer Payload = GetPayload(AttachmentHash); if (!Payload) { - std::optional<CbObject> Op = Oplog.GetOpByIndex(It.second); + auto It = Attachments.find(AttachmentHash); + ZEN_ASSERT(It != Attachments.end()); + + std::optional<CbObject> Op = Oplog.GetOpByIndex(It->second); ZEN_ASSERT(Op.has_value()); ExtendableStringBuilder<1024> Sb; Sb.Append("Failed to find attachment '"); @@ -672,7 +683,7 @@ SaveOplog(CidStore& ChunkStore, Offset += Buffer.GetSize(); } void* FileHandle = BlockFile.Detach(); - BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); } catch (std::exception& Ex) { @@ -682,7 +693,7 @@ SaveOplog(CidStore& ChunkStore, return; } - BlockBuffer.MarkAsDeleteOnClose(); + BlockBuffer.SetDeleteOnClose(true); { RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); @@ -757,7 +768,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000))); } - if (!ContainerSaveResult.Needs.empty()) + if (!ContainerSaveResult.Needs.empty() || ForceUpload) { ZEN_INFO("Filtering needed attachments..."); std::vector<IoHash> NeededLargeAttachments; @@ -766,6 +777,7 @@ SaveOplog(CidStore& ChunkStore, NeededOtherAttachments.reserve(CreatedBlocks.size()); if (ForceUpload) { + // TODO: Check ForceUpload - it should add all attachments and blocks regardless if Needs is empty or not NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end()); } else @@ -791,59 +803,63 @@ SaveOplog(CidStore& ChunkStore, { break; } - SaveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, &TempAttachments]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } + IoBuffer Payload; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = std::move(BlockIt->second); + } + else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) + { + Payload = LooseTmpFileIt->second; + TempAttachments.erase(LooseTmpFileIt); + } - IoBuffer Payload; - if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) - { - Payload = std::move(BlockIt->second); - } - else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) - { - Payload = std::move(LooseTmpFileIt->second); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } + SaveAttachmentsLatch.AddCount(1); + WorkerPool.ScheduleWork([&ChunkStore, + &RemoteStore, + &SaveAttachmentsLatch, + &RemoteResult, + RawHash, + &CreatedBlocks, + TempPayload = std::move(Payload)]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } + IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", + RemoteResult.GetErrorReason(), + RemoteResult.GetError()); + return; + } - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", - RawHash, - NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); - return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", RawHash, NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); return; - }); + } + ZEN_DEBUG("Saved attachment {}, {} in {}", + RawHash, + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); + return; + }); } } + TempAttachments.clear(); if (!CreatedBlocks.empty()) { @@ -967,7 +983,6 @@ SaveOplog(CidStore& ChunkStore, ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); } SaveAttachmentsLatch.Wait(); - TempAttachments.clear(); } if (!RemoteResult.IsError()) @@ -1224,6 +1239,10 @@ LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::O RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment); + if (!Attachments.empty()) + { + ZEN_INFO("Found {} attachments to download", Attachments.size()); + } AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index 2137523b2..61d8a85cc 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -6,12 +6,14 @@ #include <zencore/compactbinary.h> #include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> -#include <zencore/string.h> +#include <zencore/scopeguard.h> #include <zencore/thread.h> #include <zencore/trace.h> #include <zenhttp/formatters.h> +#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> @@ -79,7 +81,7 @@ namespace detail { { return {.ElapsedSeconds = Response.elapsed, .ErrorCode = static_cast<int32_t>(Response.status_code), - .Reason = Response.reason, + .Reason = Response.reason.empty() ? Response.text : Response.reason, .Success = false}; } return {.Bytes = Response.downloaded_bytes, @@ -89,6 +91,72 @@ namespace detail { .Success = true}; } + cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer) + { + if (TempFolderPath.empty()) + { + return Session.Get(); + } + + std::string PayloadString; + std::shared_ptr<BasicFile> PayloadFile; + + auto _ = MakeGuard([&]() { + if (PayloadFile) + { + PayloadFile.reset(); + std::filesystem::path TempPath = TempFolderPath / Name; + std::error_code Ec; + std::filesystem::remove(TempPath, Ec); + } + }); + + uint64_t Offset = 0; + Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) { + if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + { + std::filesystem::path TempPath = TempFolderPath / Name; + PayloadFile = std::make_shared<BasicFile>(); + PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete); + PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset); + Offset += PayloadString.size(); + PayloadString.clear(); + } + if (PayloadFile) + { + PayloadFile->Write(data.data(), data.size(), Offset); + Offset += data.size(); + } + else + { + PayloadString.append(data); + } + return true; + }}); + + cpr::Response Response = Session.Get(); + + if (!Response.error && IsHttpSuccessCode(Response.status_code)) + { + if (PayloadFile) + { + uint64_t PayloadSize = PayloadFile->FileSize(); + void* FileHandle = PayloadFile->Detach(); + PayloadFile.reset(); + OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true); + OutBuffer.SetDeleteOnClose(true); + } + else + { + OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size()); + } + return Response; + } + + Response.text.swap(PayloadString); + return Response; + } + } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) @@ -145,8 +213,8 @@ CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -194,8 +262,8 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -211,12 +279,13 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key) +CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); + std::string KeyString = Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -225,13 +294,14 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); Session.SetOption(cpr::Body{}); - cpr::Response Response = Session.Get(); + IoBuffer Payload; + cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + Result.Response = std::move(Payload); } else { @@ -245,8 +315,8 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -262,12 +332,17 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K } CloudCacheResult -CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash) +CloudCacheSession::GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("HordeClient::GetInlineBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); + std::string KeyString = Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -276,13 +351,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); Session.SetOption(cpr::Body{}); - cpr::Response Response = Session.Get(); + IoBuffer Payload; + cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + Result.Response = std::move(Payload); } else { @@ -296,8 +372,8 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -357,8 +433,8 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -426,8 +502,8 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -495,8 +571,8 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -546,8 +622,8 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -612,8 +688,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -670,8 +746,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -719,8 +795,8 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -766,8 +842,8 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -822,8 +898,8 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -906,8 +982,8 @@ CloudCacheSession::PostComputeTasks(IoBuffer TasksData) "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -959,8 +1035,8 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -1031,8 +1107,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -1096,8 +1172,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h index cabb61a1f..4e7d11fd9 100644 --- a/src/zenserver/upstream/jupiter.h +++ b/src/zenserver/upstream/jupiter.h @@ -98,9 +98,13 @@ public: CloudCacheResult Authenticate(); CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); - CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); - CloudCacheResult GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash); + CloudCacheResult GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath = {}); PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index abf77f8a6..cdd7abae7 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -60,7 +60,7 @@ BlockStoreFile::Open() return true; }); void* FileHandle = m_File.Handle(); - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize(), /*IsWholeFile*/ true); } void @@ -88,7 +88,7 @@ BlockStoreFile::Create(uint64_t InitialSize) // We map our m_IoBuffer beyond the file size as we will grow it over time and want // to be able to create sub-buffers of all the written range later - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize, false); } uint64_t @@ -100,7 +100,7 @@ BlockStoreFile::FileSize() void BlockStoreFile::MarkAsDeleteOnClose() { - m_IoBuffer.MarkAsDeleteOnClose(); + m_IoBuffer.SetDeleteOnClose(true); } IoBuffer diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index f641b899e..56a840701 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -221,27 +221,6 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN } } -#if ZEN_PLATFORM_WINDOWS -static void -DeletePayloadFileOnClose(const void* FileHandle) -{ - const HANDLE WinFileHandle = (const HANDLE)FileHandle; - // This will cause the file to be deleted when the last handle to it is closed - FILE_DISPOSITION_INFO Fdi{}; - Fdi.DeleteFile = TRUE; - BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); - - if (!Success) - { - // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed - // to delete it. - ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", - PathFromHandle(WinFileHandle), - GetLastErrorAsString()); - } -} -#endif - CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { @@ -269,7 +248,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: // place in the file store directory, thus avoiding unnecessary copying IoBufferFileReference FileRef; - if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) + bool IsWholeFile = Chunk.IsWholeFile(); + if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef)) { { bool Exists = true; @@ -279,21 +259,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } if (Exists) { -#if ZEN_PLATFORM_WINDOWS - DeletePayloadFileOnClose(FileRef.FileHandle); -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); - if (unlink(FilePath.c_str()) < 0) - { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) - { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - FilePath.string(), - GetSystemErrorAsString(UnlinkError)); - } - } -#endif return CasStore::InsertResult{.New = false}; } } @@ -330,8 +295,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); } - DeletePayloadFileOnClose(ChunkFileHandle); - return CasStore::InsertResult{.New = IsNew}; } else @@ -429,6 +392,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if (Success) { m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + Chunk.SetDeleteOnClose(false); HashLock.ReleaseNow(); @@ -441,7 +405,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: { m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } - return CasStore::InsertResult{.New = IsNew}; } @@ -450,7 +413,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) { HashLock.ReleaseNow(); - DeletePayloadFileOnClose(ChunkFileHandle); bool IsNew = false; { @@ -464,40 +426,44 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: return CasStore::InsertResult{.New = IsNew}; } - ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", GetSystemErrorAsString(LastError), ChunkHash); - DeletePayloadFileOnClose(ChunkFileHandle); - #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); - std::filesystem::path DestPath = Name.ShardedPath.c_str(); - int Ret = link(SourcePath.c_str(), DestPath.c_str()); + std::filesystem::path DestPath = Name.ShardedPath.c_str(); + int Ret = link(SourcePath.c_str(), DestPath.c_str()); if (Ret < 0 && zen::GetLastError() == ENOENT) { // Destination directory doesn't exist. Create it any try again. CreateDirectories(DestPath.parent_path().c_str()); Ret = link(SourcePath.c_str(), DestPath.c_str()); } - int LinkError = zen::GetLastError(); - - if (unlink(SourcePath.c_str()) < 0) + if (Ret == 0) { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) + m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + Chunk.SetDeleteOnClose(false); + + HashLock.ReleaseNow(); + bool IsNew = false; { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - SourcePath.string(), - GetSystemErrorAsString(UnlinkError)); + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } + return CasStore::InsertResult{.New = IsNew}; } - - // It is possible that someone beat us to it in linking the file. In that - // case a "file exists" error is okay. All others are not. - if (Ret < 0) + else { + int LinkError = zen::GetLastError(); + + // It is possible that someone beat us to it in linking the file. In that + // case a "file exists" error is okay. All others are not. if (LinkError == EEXIST) { HashLock.ReleaseNow(); @@ -517,20 +483,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: GetSystemErrorAsString(LinkError), ChunkHash); } - else - { - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } #endif // ZEN_PLATFORM_* } |