aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-03-15 17:41:19 +0100
committerDan Engelbrecht <[email protected]>2022-03-31 11:28:32 +0200
commitce3ea816c2609ed4e5afc53aa47b3de7c9ab2bad (patch)
tree48247aaf9b0accfca5efbda98435ac42d7db8570
parentDelete GCd blocks on close. (diff)
downloadzen-ce3ea816c2609ed4e5afc53aa47b3de7c9ab2bad.tar.xz
zen-ce3ea816c2609ed4e5afc53aa47b3de7c9ab2bad.zip
Manage lifetime of FileHandle
-rw-r--r--zencore/iobuffer.cpp7
-rw-r--r--zenstore/basicfile.cpp14
-rw-r--r--zenstore/compactcas.cpp173
-rw-r--r--zenstore/compactcas.h33
-rw-r--r--zenstore/include/zenstore/basicfile.h7
5 files changed, 194 insertions, 40 deletions
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index e2aaa3169..2e39ef3cb 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -186,6 +186,10 @@ IoBufferExtendedCore::IoBufferExtendedCore(const IoBufferExtendedCore* Outer, ui
, m_FileHandle(Outer->m_FileHandle)
, m_FileOffset(Outer->m_FileOffset + Offset)
{
+ if (!m_FileHandle)
+ {
+ ZEN_ASSERT(false);
+ }
m_Flags.fetch_or(kIsOwnedByThis | kIsExtended, std::memory_order_relaxed);
}
@@ -220,7 +224,8 @@ IoBufferExtendedCore::~IoBufferExtendedCore()
if (!Success)
{
- ZEN_WARN("Error reported on file handle close!");
+ DWORD LastError = GetLastError();
+ ZEN_WARN("Error reported on file handle close, reason {}", GetSystemErrorAsString(LastError));
}
}
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index 895db6cee..1f6ead2d3 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -279,6 +279,20 @@ BasicFile::FileSize()
#endif
}
+void
+BasicFile::SetFileSize(uint64_t FileSize)
+{
+#if ZEN_PLATFORM_WINDOWS
+ LARGE_INTEGER liFileSize;
+ liFileSize.QuadPart = FileSize;
+ ::SetFilePointerEx(m_FileHandle, liFileSize, 0, FILE_BEGIN);
+ ::SetEndOfFile(m_FileHandle);
+#else
+ int Fd = int(intptr_t(m_FileHandle));
+ int ftruncate64(Fd, FileSize);
+#endif
+}
+
//////////////////////////////////////////////////////////////////////////
TemporaryFile::~TemporaryFile()
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index ed8ee23ce..a44f7a6c0 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -56,8 +56,117 @@ namespace {
#endif
}
+ void PreAllocateBlockSize(void* FileHandle, uint64_t Size)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ FILE_ALLOCATION_INFO info;
+ info.AllocationSize.QuadPart = Size; // 100GB
+ BOOL Success = SetFileInformationByHandle(FileHandle, FileAllocationInfo, &info, sizeof(info));
+ if (!Success)
+ {
+ ZEN_WARN("Failed set allocated size for '{}': '{}'", PathFromHandle(FileHandle), GetLastErrorAsString());
+ }
+#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+#endif
+ }
+
} // namespace
+ChunkBlock::ChunkBlock(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint16_t BlockIndex)
+: m_Path(BuildUcasPath(RootDirectory, ContainerBaseName, BlockIndex))
+{
+}
+
+ChunkBlock::~ChunkBlock()
+{
+ m_SmallObjectFile.Detach();
+}
+
+const std::filesystem::path
+ChunkBlock::GetPath() const
+{
+ return m_Path;
+}
+
+void
+ChunkBlock::Open()
+{
+ void* FileHandle = m_SmallObjectFile.Handle();
+ if (FileHandle != nullptr)
+ {
+ return;
+ }
+ m_SmallObjectFile.Open(m_Path, false);
+ FileHandle = m_SmallObjectFile.Handle();
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, m_SmallObjectFile.FileSize());
+}
+
+void
+ChunkBlock::Create(uint64_t InitialSize)
+{
+ void* FileHandle = m_SmallObjectFile.Handle();
+ if (FileHandle != nullptr)
+ {
+ return;
+ }
+ m_SmallObjectFile.Open(m_Path, true);
+ if (InitialSize > 0)
+ {
+ m_SmallObjectFile.SetFileSize(InitialSize);
+ }
+ FileHandle = m_SmallObjectFile.Handle();
+ m_IoBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, InitialSize);
+}
+
+uint64_t
+ChunkBlock::FileSize()
+{
+ return m_SmallObjectFile.FileSize();
+}
+
+void
+ChunkBlock::MarkAsDeleteOnClose()
+{
+ void* FileHandle = m_SmallObjectFile.Handle();
+ if (FileHandle == nullptr)
+ {
+ return;
+ }
+ MarkFileAsDeleteOnClose(FileHandle);
+}
+
+IoBuffer
+ChunkBlock::GetRange(uint64_t Offset, uint64_t Size)
+{
+ // What happens if we write to the file and it gets bigger and we
+ // request a new range that was not available when we called ChunkBlock::Open()
+ return IoBuffer(m_IoBuffer, Offset, Size);
+}
+
+void
+ChunkBlock::Read(void* Data, uint64_t Size, uint64_t FileOffset)
+{
+ m_SmallObjectFile.Read(Data, Size, FileOffset);
+}
+
+void
+ChunkBlock::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
+{
+ m_SmallObjectFile.Write(Data, Size, FileOffset);
+}
+
+void
+ChunkBlock::Flush()
+{
+ m_SmallObjectFile.Flush();
+}
+
+void
+ChunkBlock::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
+{
+ m_SmallObjectFile.StreamByteRange(FileOffset, Size, std::move(ChunkFun));
+}
+
CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config, CasGc& Gc)
: GcStorage(Gc)
, m_Config(Config)
@@ -116,9 +225,8 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const
}
}
m_CurrentBlockIndex = NewBlockIndex;
- std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
+ auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
+ SmallObjectFile->Create(m_MaxBlockSize);
m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
m_CurrentBlock = SmallObjectFile;
m_CurrentInsertOffset = 0;
@@ -155,7 +263,7 @@ CasContainerStrategy::FindChunk(const IoHash& ChunkHash)
if (auto BlockIt = m_OpenBlocks.find(Location.BlockIndex); BlockIt != m_OpenBlocks.end())
{
- return IoBufferBuilder::MakeFromFileHandle(BlockIt->second->Handle(), Location.Offset, Location.Size);
+ return BlockIt->second->GetRange(Location.Offset, Location.Size);
}
}
@@ -427,9 +535,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
}
}
m_CurrentBlockIndex = NewBlockIndex;
- std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
+ auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
+ SmallObjectFile->Create(m_MaxBlockSize);
m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
m_CurrentBlock = SmallObjectFile;
m_CurrentInsertOffset = 0;
@@ -438,7 +545,7 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
// Move all chunks in blocks that have chunks removed to new blocks
- std::shared_ptr<BasicFile> NewBlockFile;
+ std::shared_ptr<ChunkBlock> NewBlockFile;
uint64_t WriteOffset = {};
uint16_t NewBlockIndex = {};
std::unordered_map<IoHash, CasDiskLocation> MovedBlocks;
@@ -459,16 +566,13 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
RwLock::ExclusiveLockScope _i(m_InsertLock);
auto BlockFile = m_OpenBlocks[BlockIndex];
- auto FileHandle = BlockFile->Handle();
- // m_OpenBlocks.erase(BlockIndex);
- // BlockFile->Close();
- // fs::remove(BlockPath);
- ZEN_INFO("marking cas store file for delete {}, count limit {} exeeded", m_ContainerBaseName, PathFromHandle(FileHandle));
- MarkFileAsDeleteOnClose(FileHandle);
+ ZEN_INFO("marking cas store file for delete {}, block {}", m_ContainerBaseName, std::to_string(BlockIndex));
+ BlockFile->MarkAsDeleteOnClose();
+ m_OpenBlocks.erase(BlockIndex);
continue;
}
- std::shared_ptr<BasicFile> BlockFile;
+ std::shared_ptr<ChunkBlock> BlockFile;
{
RwLock::ExclusiveLockScope _i(m_InsertLock);
BlockFile = m_OpenBlocks[BlockIndex];
@@ -508,7 +612,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
return;
}
}
- m_OpenBlocks[NewBlockIndex] = std::shared_ptr<BasicFile>(); // Make sure nobody steals this slot
+ m_OpenBlocks[NewBlockIndex] = std::make_shared<ChunkBlock>(m_Config.RootDirectory,
+ m_ContainerBaseName,
+ NewBlockIndex); // Make sure nobody steals this slot
}
std::error_code Error;
@@ -530,9 +636,8 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
return;
}
- std::filesystem::path NewBlockPath = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex);
- NewBlockFile = std::make_shared<BasicFile>();
- NewBlockFile->Open(NewBlockPath, true);
+ NewBlockFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, NewBlockIndex);
+ NewBlockFile->Create(m_MaxBlockSize);
MovedBlocks.clear();
WriteOffset = 0;
}
@@ -559,12 +664,9 @@ CasContainerStrategy::CollectGarbage(GcContext& GcCtx)
m_CasLog.Append({.Key = MovedEntry.first, .Location = MovedEntry.second});
}
}
- auto FileHandle = BlockFile->Handle();
- // m_OpenBlocks.erase(BlockIndex);
- // BlockFile->Close();
- // fs::remove(BlockPath);
- ZEN_INFO("marking cas store file for delete {}, count limit {} exeeded", m_ContainerBaseName, PathFromHandle(FileHandle));
- MarkFileAsDeleteOnClose(FileHandle);
+ ZEN_INFO("marking cas store file for delete {}, block index {}", m_ContainerBaseName, BlockIndex);
+ BlockFile->MarkAsDeleteOnClose();
+ m_OpenBlocks.erase(BlockIndex);
}
}
@@ -732,7 +834,7 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
ReferencedBlockIndexes.insert(Location.BlockIndex);
}
- uint32_t SmallestBlockSize = 0xffffffffu;
+ uint32_t SmallestBlockSize = gsl::narrow<uint32_t>(m_MaxBlockSize);
for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.RootDirectory))
{
if (Entry.is_regular_file())
@@ -761,13 +863,19 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
std::filesystem::remove(Entry.path());
continue;
}
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(Entry.path(), false);
+ auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, BlockIndex);
+ SmallObjectFile->Open();
+ uint64_t FileSize = SmallObjectFile->FileSize();
+ if (FileSize < SmallestBlockSize)
+ {
+ m_CurrentBlockIndex = BlockIndex;
+ SmallestBlockSize = gsl::narrow<std::uint32_t>(FileSize);
+ }
m_OpenBlocks[BlockIndex] = SmallObjectFile;
- if (SmallObjectFile->FileSize() < SmallestBlockSize)
+ if (FileSize < SmallestBlockSize)
{
m_CurrentBlockIndex = BlockIndex;
- SmallestBlockSize = gsl::narrow<std::uint32_t>(SmallObjectFile->FileSize());
+ SmallestBlockSize = gsl::narrow<std::uint32_t>(FileSize);
}
}
catch (const std::invalid_argument&)
@@ -778,9 +886,8 @@ CasContainerStrategy::OpenContainer(bool IsNewStore)
}
if (m_OpenBlocks.empty())
{
- std::filesystem::path path = BuildUcasPath(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
- auto SmallObjectFile = std::make_shared<BasicFile>();
- SmallObjectFile->Open(path, true);
+ auto SmallObjectFile = std::make_shared<ChunkBlock>(m_Config.RootDirectory, m_ContainerBaseName, m_CurrentBlockIndex);
+ SmallObjectFile->Create(m_MaxBlockSize);
m_OpenBlocks[m_CurrentBlockIndex] = SmallObjectFile;
m_CurrentBlock = SmallObjectFile;
m_CurrentInsertOffset = 0;
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 35f118f34..70937bdb1 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -38,6 +38,27 @@ struct CasDiskIndexEntry
uint8_t Flags = 0;
};
+struct ChunkBlock
+{
+ ChunkBlock(const std::filesystem::path& RootDirectory, const std::string_view ContainerBaseName, uint16_t BlockIndex);
+ ~ChunkBlock();
+ const std::filesystem::path GetPath() const;
+ void Open();
+ void Create(uint64_t InitialSize);
+ void MarkAsDeleteOnClose();
+ uint64_t FileSize();
+ IoBuffer GetRange(uint64_t Offset, uint64_t Size);
+ void Read(void* Data, uint64_t Size, uint64_t FileOffset);
+ void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
+ void Flush();
+ void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
+
+private:
+ std::filesystem::path m_Path;
+ BasicFile m_SmallObjectFile;
+ IoBuffer m_IoBuffer;
+};
+
#pragma pack(pop)
static_assert(sizeof(CasDiskIndexEntry) == 32);
@@ -81,12 +102,12 @@ private:
RwLock m_LocationMapLock;
std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap;
- RwLock m_InsertLock; // used to serialize inserts
- std::unordered_map<uint16_t, std::shared_ptr<BasicFile>> m_OpenBlocks;
- std::weak_ptr<BasicFile> m_CurrentBlock;
- uint16_t m_CurrentBlockIndex = 0;
- std::atomic_uint32_t m_CurrentInsertOffset{};
- std::atomic_uint64_t m_TotalSize{};
+ RwLock m_InsertLock; // used to serialize inserts
+ std::unordered_map<uint16_t, std::shared_ptr<ChunkBlock>> m_OpenBlocks;
+ std::weak_ptr<ChunkBlock> m_CurrentBlock;
+ uint16_t m_CurrentBlockIndex = 0;
+ std::atomic_uint32_t m_CurrentInsertOffset{};
+ std::atomic_uint64_t m_TotalSize{};
void MakeIndexSnapshot();
};
diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h
index 2df016c76..9ee4ee512 100644
--- a/zenstore/include/zenstore/basicfile.h
+++ b/zenstore/include/zenstore/basicfile.h
@@ -43,10 +43,17 @@ public:
void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec);
void Flush();
uint64_t FileSize();
+ void SetFileSize(uint64_t FileSize);
IoBuffer ReadAll();
void WriteAll(IoBuffer Data, std::error_code& Ec);
inline void* Handle() { return m_FileHandle; }
+ inline void* Detach()
+ {
+ void* FileHandle = m_FileHandle;
+ m_FileHandle = 0;
+ return FileHandle;
+ }
protected:
void* m_FileHandle = nullptr; // This is either null or valid