aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-05 18:53:44 -0400
committerGitHub <[email protected]>2023-09-06 00:53:44 +0200
commit832a1b464633ec7a31a8aad386520e1990d0b6cb (patch)
treea07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src/zenstore
parentretry file create (#383) (diff)
downloadzen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz
zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file * rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag * changelog * log number of attachments to download * add delay on jupiter request failure when retrying * make sure we upload all attachments even if Needs are empty when ForceUpload is true release TempAttachment as soon as it is used * sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src/zenstore')
-rw-r--r--src/zenstore/blockstore.cpp6
-rw-r--r--src/zenstore/filecas.cpp96
2 files changed, 27 insertions, 75 deletions
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index abf77f8a6..cdd7abae7 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -60,7 +60,7 @@ BlockStoreFile::Open()
return true;
});
void* FileHandle = m_File.Handle();
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize());
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_File.FileSize(), /*IsWholeFile*/ true);
}
void
@@ -88,7 +88,7 @@ BlockStoreFile::Create(uint64_t InitialSize)
// We map our m_IoBuffer beyond the file size as we will grow it over time and want
// to be able to create sub-buffers of all the written range later
- m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize, false);
}
uint64_t
@@ -100,7 +100,7 @@ BlockStoreFile::FileSize()
void
BlockStoreFile::MarkAsDeleteOnClose()
{
- m_IoBuffer.MarkAsDeleteOnClose();
+ m_IoBuffer.SetDeleteOnClose(true);
}
IoBuffer
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index f641b899e..56a840701 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -221,27 +221,6 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
}
}
-#if ZEN_PLATFORM_WINDOWS
-static void
-DeletePayloadFileOnClose(const void* FileHandle)
-{
- const HANDLE WinFileHandle = (const HANDLE)FileHandle;
- // This will cause the file to be deleted when the last handle to it is closed
- FILE_DISPOSITION_INFO Fdi{};
- Fdi.DeleteFile = TRUE;
- BOOL Success = SetFileInformationByHandle(WinFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
-
- if (!Success)
- {
- // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
- // to delete it.
- ZEN_WARN("Failed to flag CAS temporary payload file '{}' for deletion: '{}'",
- PathFromHandle(WinFileHandle),
- GetLastErrorAsString());
- }
-}
-#endif
-
CasStore::InsertResult
FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
{
@@ -269,7 +248,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
// place in the file store directory, thus avoiding unnecessary copying
IoBufferFileReference FileRef;
- if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
+ bool IsWholeFile = Chunk.IsWholeFile();
+ if (IsWholeFile && Chunk.GetFileReference(/* out */ FileRef))
{
{
bool Exists = true;
@@ -279,21 +259,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
}
if (Exists)
{
-#if ZEN_PLATFORM_WINDOWS
- DeletePayloadFileOnClose(FileRef.FileHandle);
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- std::filesystem::path FilePath = PathFromHandle(FileRef.FileHandle);
- if (unlink(FilePath.c_str()) < 0)
- {
- int UnlinkError = zen::GetLastError();
- if (UnlinkError != ENOENT)
- {
- ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
- FilePath.string(),
- GetSystemErrorAsString(UnlinkError));
- }
- }
-#endif
return CasStore::InsertResult{.New = false};
}
}
@@ -330,8 +295,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
m_TotalSize.fetch_add(static_cast<uint64_t>(ChunkSize), std::memory_order::relaxed);
}
- DeletePayloadFileOnClose(ChunkFileHandle);
-
return CasStore::InsertResult{.New = IsNew};
}
else
@@ -429,6 +392,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
if (Success)
{
m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+ Chunk.SetDeleteOnClose(false);
HashLock.ReleaseNow();
@@ -441,7 +405,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
{
m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
}
-
return CasStore::InsertResult{.New = IsNew};
}
@@ -450,7 +413,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
{
HashLock.ReleaseNow();
- DeletePayloadFileOnClose(ChunkFileHandle);
bool IsNew = false;
{
@@ -464,40 +426,44 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
return CasStore::InsertResult{.New = IsNew};
}
-
ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
GetSystemErrorAsString(LastError),
ChunkHash);
- DeletePayloadFileOnClose(ChunkFileHandle);
-
#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+
std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
- std::filesystem::path DestPath = Name.ShardedPath.c_str();
- int Ret = link(SourcePath.c_str(), DestPath.c_str());
+ std::filesystem::path DestPath = Name.ShardedPath.c_str();
+ int Ret = link(SourcePath.c_str(), DestPath.c_str());
if (Ret < 0 && zen::GetLastError() == ENOENT)
{
// Destination directory doesn't exist. Create it any try again.
CreateDirectories(DestPath.parent_path().c_str());
Ret = link(SourcePath.c_str(), DestPath.c_str());
}
- int LinkError = zen::GetLastError();
-
- if (unlink(SourcePath.c_str()) < 0)
+ if (Ret == 0)
{
- int UnlinkError = zen::GetLastError();
- if (UnlinkError != ENOENT)
+ m_CasLog.Append({.Key = ChunkHash, .Size = Chunk.Size()});
+ Chunk.SetDeleteOnClose(false);
+
+ HashLock.ReleaseNow();
+ bool IsNew = false;
{
- ZEN_WARN("Failed to unlink CAS temporary payload file '{}': '{}'",
- SourcePath.string(),
- GetSystemErrorAsString(UnlinkError));
+ RwLock::ExclusiveLockScope __(m_Lock);
+ IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
+ }
+ if (IsNew)
+ {
+ m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
}
+ return CasStore::InsertResult{.New = IsNew};
}
-
- // It is possible that someone beat us to it in linking the file. In that
- // case a "file exists" error is okay. All others are not.
- if (Ret < 0)
+ else
{
+ int LinkError = zen::GetLastError();
+
+ // It is possible that someone beat us to it in linking the file. In that
+ // case a "file exists" error is okay. All others are not.
if (LinkError == EEXIST)
{
HashLock.ReleaseNow();
@@ -517,20 +483,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::
GetSystemErrorAsString(LinkError),
ChunkHash);
}
- else
- {
- HashLock.ReleaseNow();
- bool IsNew = false;
- {
- RwLock::ExclusiveLockScope __(m_Lock);
- IsNew = m_Index.insert({ChunkHash, IndexEntry{.Size = Chunk.Size()}}).second;
- }
- if (IsNew)
- {
- m_TotalSize.fetch_add(Chunk.Size(), std::memory_order::relaxed);
- }
- return CasStore::InsertResult{.New = IsNew};
- }
#endif // ZEN_PLATFORM_*
}