diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-05 18:53:44 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-06 00:53:44 +0200 |
| commit | 832a1b464633ec7a31a8aad386520e1990d0b6cb (patch) | |
| tree | a07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src/zenstore | |
| parent | retry file create (#383) (diff) | |
| download | zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip | |
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file
* rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag
* changelog
* log number of attachments to download
* add delay on jupiter request failure when retrying
* make sure we upload all attachments even if Needs are empty when ForceUpload is true
release TempAttachment as soon as it is used
* sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/blockstore.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 96 |
2 files changed, 27 insertions, 75 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index abf77f8a6..cdd7abae7 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -60,7 +60,7 @@ BlockStoreFile::Open() return true; }); void* FileHandle = m_File.Handle(); - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize()); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize(), /*IsWholeFile*/ true); } void @@ -88,7 +88,7 @@ BlockStoreFile::Create(uint64_t InitialSize) // We map our m_IoBuffer beyond the file size as we will grow it over time and want // to be able to create sub-buffers of all the written range later - m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize); + m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize, false); } uint64_t @@ -100,7 +100,7 @@ BlockStoreFile::FileSize() void BlockStoreFile::MarkAsDeleteOnClose() { - m_IoBuffer.MarkAsDeleteOnClose(); + m_IoBuffer.SetDeleteOnClose(true); } IoBuffer diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index f641b899e..56a840701 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -221,27 +221,6 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN } } -#if ZEN_PLATFORM_WINDOWS -static void -DeletePayloadFileOnClose(const void* FileHandle) -{ - const HANDLE WinFileHandle = (const HANDLE)FileHandle; - // This will cause the file to be deleted when the last handle to it is closed - FILE_DISPOSITION_INFO Fdi{}; - Fdi.DeleteFile = TRUE; - BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); - - if (!Success) - { - // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed - // to delete it. - ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'", - PathFromHandle(WinFileHandle), - GetLastErrorAsString()); - } -} -#endif - CasStore::InsertResult FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { @@ -269,7 +248,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: // place in the file store directory, thus avoiding unnecessary copying IoBufferFileReference FileRef; - if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) + bool IsWholeFile = Chunk.IsWholeFile(); + if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef)) { { bool Exists = true; @@ -279,21 +259,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: } if (Exists) { -#if ZEN_PLATFORM_WINDOWS - DeletePayloadFileOnClose(FileRef.FileHandle); -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle); - if (unlink(FilePath.c_str()) < 0) - { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) - { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - FilePath.string(), - GetSystemErrorAsString(UnlinkError)); - } - } -#endif return CasStore::InsertResult{.New = false}; } } @@ -330,8 +295,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed); } - DeletePayloadFileOnClose(ChunkFileHandle); - return CasStore::InsertResult{.New = IsNew}; } else @@ -429,6 +392,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if (Success) { m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + Chunk.SetDeleteOnClose(false); HashLock.ReleaseNow(); @@ -441,7 +405,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: { m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } - return CasStore::InsertResult{.New = IsNew}; } @@ -450,7 +413,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS)) { HashLock.ReleaseNow(); - DeletePayloadFileOnClose(ChunkFileHandle); bool IsNew = false; { @@ -464,40 +426,44 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: return CasStore::InsertResult{.New = IsNew}; } - ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}", GetSystemErrorAsString(LastError), ChunkHash); - DeletePayloadFileOnClose(ChunkFileHandle); - #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); - std::filesystem::path DestPath = Name.ShardedPath.c_str(); - int Ret = link(SourcePath.c_str(), DestPath.c_str()); + std::filesystem::path DestPath = Name.ShardedPath.c_str(); + int Ret = link(SourcePath.c_str(), DestPath.c_str()); if (Ret < 0 && zen::GetLastError() == ENOENT) { // Destination directory doesn't exist. Create it any try again. CreateDirectories(DestPath.parent_path().c_str()); Ret = link(SourcePath.c_str(), DestPath.c_str()); } - int LinkError = zen::GetLastError(); - - if (unlink(SourcePath.c_str()) < 0) + if (Ret == 0) { - int UnlinkError = zen::GetLastError(); - if (UnlinkError != ENOENT) + m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()}); + Chunk.SetDeleteOnClose(false); + + HashLock.ReleaseNow(); + bool IsNew = false; { - ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'", - SourcePath.string(), - GetSystemErrorAsString(UnlinkError)); + RwLock::ExclusiveLockScope __(m_Lock); + IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; + } + if (IsNew) + { + m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); } + return CasStore::InsertResult{.New = IsNew}; } - - // It is possible that someone beat us to it in linking the file. In that - // case a "file exists" error is okay. All others are not. - if (Ret < 0) + else { + int LinkError = zen::GetLastError(); + + // It is possible that someone beat us to it in linking the file. In that + // case a "file exists" error is okay. All others are not. if (LinkError == EEXIST) { HashLock.ReleaseNow(); @@ -517,20 +483,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: GetSystemErrorAsString(LinkError), ChunkHash); } - else - { - HashLock.ReleaseNow(); - bool IsNew = false; - { - RwLock::ExclusiveLockScope __(m_Lock); - IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second; - } - if (IsNew) - { - m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed); - } - return CasStore::InsertResult{.New = IsNew}; - } #endif // ZEN_PLATFORM_* } |