diff options
Diffstat (limited to 'zenstore')
| -rw-r--r-- | zenstore/CAS.cpp | 2 | ||||
| -rw-r--r-- | zenstore/basicfile.cpp | 194 | ||||
| -rw-r--r-- | zenstore/caslog.cpp | 51 | ||||
| -rw-r--r-- | zenstore/cidstore.cpp | 30 | ||||
| -rw-r--r-- | zenstore/filecas.cpp | 5 | ||||
| -rw-r--r-- | zenstore/include/zenstore/basicfile.h | 40 | ||||
| -rw-r--r-- | zenstore/include/zenstore/caslog.h | 10 | ||||
| -rw-r--r-- | zenstore/include/zenstore/cidstore.h | 5 |
8 files changed, 269 insertions, 68 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index eaf72cb41..1db2b50bf 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -50,7 +50,7 @@ CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& P void CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback) { - for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It) { Callback(*It); } diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp index fe54184cf..f41f04101 100644 --- a/zenstore/basicfile.cpp +++ b/zenstore/basicfile.cpp @@ -35,12 +35,19 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate) void BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec) { + Ec.clear(); + const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING; - const DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; + DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; const DWORD dwShareMode = FILE_SHARE_READ; const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; HANDLE hTemplateFile = nullptr; + if (IsCreate) + { + dwDesiredAccess |= DELETE; + } + HANDLE FileHandle = CreateFile(FileName.c_str(), dwDesiredAccess, dwShareMode, @@ -52,6 +59,8 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& if (FileHandle == INVALID_HANDLE_VALUE) { Ec = zen::MakeErrorCodeFromLastError(); + + return; } m_FileHandle = FileHandle; @@ -63,25 +72,37 @@ BasicFile::Close() if (m_FileHandle) { ::CloseHandle(m_FileHandle); + m_FileHandle = nullptr; } } void -BasicFile::Read(void* Data, uint64_t Size, uint64_t Offset) +BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) { - OVERLAPPED Ovl{}; + const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; - Ovl.Offset = DWORD(Offset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(Offset >> 32); + while (BytesToRead) + { + const uint64_t NumberOfBytesToRead = Min(BytesToRead, MaxChunkSize); - DWORD dwNumberOfBytesToRead = gsl::narrow<DWORD>(Size); - DWORD dwNumberOfBytesRead = 0; + OVERLAPPED Ovl{}; - BOOL Success = ::ReadFile(m_FileHandle, Data, dwNumberOfBytesToRead, &dwNumberOfBytesRead, &Ovl); + Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); + Ovl.OffsetHigh = DWORD(FileOffset >> 32); - if (!Success) - { - ThrowLastError("Failed to read from file '{}'"_format(zen::PathFromHandle(m_FileHandle))); + DWORD dwNumberOfBytesRead = 0; + BOOL Success = ::ReadFile(m_FileHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl); + + ZEN_ASSERT(dwNumberOfBytesRead == NumberOfBytesToRead); + + if (!Success) + { + ThrowLastError("Failed to read from file '{}'"_format(zen::PathFromHandle(m_FileHandle))); + } + + BytesToRead -= NumberOfBytesToRead; + FileOffset += NumberOfBytesToRead; + Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead; } } @@ -89,9 +110,7 @@ IoBuffer BasicFile::ReadAll() { IoBuffer Buffer(FileSize()); - - Read((void*)Buffer.Data(), Buffer.Size(), 0); - + Read(Buffer.MutableData(), Buffer.Size(), 0); return Buffer; } @@ -125,25 +144,57 @@ BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<voi } void -BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset) +BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec) { - OVERLAPPED Ovl{}; + Ec.clear(); + + const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; + + while (Size) + { + const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize); + + OVERLAPPED Ovl{}; + + Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); + Ovl.OffsetHigh = DWORD(FileOffset >> 32); + + DWORD dwNumberOfBytesWritten = 0; - Ovl.Offset = DWORD(Offset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(Offset >> 32); + BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); - DWORD dwNumberOfBytesToWrite = gsl::narrow<DWORD>(Size); - DWORD dwNumberOfBytesWritten = 0; + if (!Success) + { + Ec = MakeErrorCodeFromLastError(); - BOOL Success = ::WriteFile(m_FileHandle, Data, dwNumberOfBytesToWrite, &dwNumberOfBytesWritten, &Ovl); + return; + } - if (!Success) + Size -= NumberOfBytesToWrite; + FileOffset += NumberOfBytesToWrite; + Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite; + } +} + +void +BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset) +{ + std::error_code Ec; + Write(Data, Size, Offset, Ec); + + if (Ec) { - ThrowLastError("Failed to write to file '{}'"_format(zen::PathFromHandle(m_FileHandle))); + throw std::system_error(Ec, "Failed to write to file '{}'"_format(zen::PathFromHandle(m_FileHandle))); } } void +BasicFile::WriteAll(IoBuffer Data, std::error_code& Ec) +{ + Write(Data.Data(), Data.Size(), 0, Ec); +} + +void BasicFile::Flush() { FlushFileBuffers(m_FileHandle); @@ -158,6 +209,60 @@ BasicFile::FileSize() return uint64_t(liFileSize.QuadPart); } +////////////////////////////////////////////////////////////////////////// + +TemporaryFile::~TemporaryFile() +{ + Close(); +} + +void +TemporaryFile::Close() +{ + if (m_FileHandle) + { + // Mark file for deletion when final handle is closed + + FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE}; + + SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + + BasicFile::Close(); + } +} + +void +TemporaryFile::CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec) +{ + StringBuilder<64> TempName; + Oid::NewOid().ToString(TempName); + + m_TempPath = TempDirName / TempName.c_str(); + + const bool IsCreate = true; + + Open(m_TempPath, IsCreate, Ec); +} + +void +TemporaryFile::MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec) +{ + // We intentionally call the base class Close() since otherwise we'll end up + // deleting the temporary file + BasicFile::Close(); + + std::filesystem::rename(m_TempPath, FinalFileName, Ec); +} + +/* + ___________ __ + \__ ___/___ _______/ |_ ______ + | |_/ __ \ / ___/\ __\/ ___/ + | |\ ___/ \___ \ | | \___ \ + |____| \___ >____ > |__| /____ > + \/ \/ \/ +*/ + #if ZEN_WITH_TESTS TEST_CASE("BasicFile") @@ -169,6 +274,49 @@ TEST_CASE("BasicFile") CHECK_NOTHROW(File1.Open("zonk", true)); CHECK_NOTHROW(File1.Write("abcd", 4, 0)); CHECK(File1.FileSize() == 4); + { + IoBuffer Data = File1.ReadAll(); + CHECK(Data.Size() == 4); + CHECK_EQ(memcmp(Data.Data(), "abcd", 4), 0); + } + CHECK_NOTHROW(File1.Write("efgh", 4, 2)); + CHECK(File1.FileSize() == 6); + { + IoBuffer Data = File1.ReadAll(); + CHECK(Data.Size() == 6); + CHECK_EQ(memcmp(Data.Data(), "abefgh", 6), 0); + } +} + +TEST_CASE("TemporaryFile") +{ + ScopedCurrentDirectoryChange _; + + SUBCASE("DeleteOnClose") + { + TemporaryFile TmpFile; + std::error_code Ec; + TmpFile.CreateTemporary(std::filesystem::current_path(), Ec); + CHECK(!Ec); + CHECK(std::filesystem::exists(TmpFile.GetPath())); + TmpFile.Close(); + CHECK(std::filesystem::exists(TmpFile.GetPath()) == false); + } + + SUBCASE("MoveIntoPlace") + { + TemporaryFile TmpFile; + std::error_code Ec; + TmpFile.CreateTemporary(std::filesystem::current_path(), Ec); + CHECK(!Ec); + std::filesystem::path TempPath = TmpFile.GetPath(); + std::filesystem::path FinalPath = std::filesystem::current_path() / "final"; + CHECK(std::filesystem::exists(TempPath)); + TmpFile.MoveTemporaryIntoPlace(FinalPath, Ec); + CHECK(!Ec); + CHECK(std::filesystem::exists(TempPath) == false); + CHECK(std::filesystem::exists(FinalPath)); + } } void diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index dc6021544..2bac6affd 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -17,11 +17,8 @@ #include <gsl/gsl-lite.hpp> -#include <functional> - -struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax error: 'identifier' was unexpected here" when using /permissive- -#include <atlfile.h> #include <filesystem> +#include <functional> ////////////////////////////////////////////////////////////////////////// @@ -48,13 +45,12 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat { m_RecordSize = RecordSize; - const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING; + std::error_code Ec; + m_File.Open(FileName, IsCreate); - HRESULT hRes = m_File.Create(FileName.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, dwCreationDisposition); - - if (FAILED(hRes)) + if (Ec) { - throw std::system_error(GetLastError(), std::system_category(), "Failed to open log file '{}'"_format(FileName)); + throw std::system_error(Ec, "Failed to open log file '{}'"_format(FileName)); } uint64_t AppendOffset = 0; @@ -66,7 +62,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic); Header.Finalize(); - m_File.Write(&Header, sizeof Header); + m_File.Write(&Header, sizeof Header, 0); AppendOffset = sizeof(FileHeader); @@ -76,7 +72,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat { // Validate header and log contents and prepare for appending/replay FileHeader Header; - m_File.Read(&Header, sizeof Header); + m_File.Read(&Header, sizeof Header, 0); if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum())) { @@ -84,11 +80,8 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat throw std::runtime_error("Mangled log header"); } - ULONGLONG Sz; - m_File.GetSize(Sz); - AppendOffset = Sz; - - m_Header = Header; + AppendOffset = m_File.FileSize(); + m_Header = Header; } m_AppendOffset = AppendOffset; @@ -106,8 +99,7 @@ CasLogFile::Close() void CasLogFile::Replay(std::function<void(const void*)>&& Handler) { - ULONGLONG LogFileSize; - m_File.GetSize(LogFileSize); + uint64_t LogFileSize = m_File.FileSize(); // Ensure we end up on a clean boundary const uint64_t LogBaseOffset = sizeof(FileHeader); @@ -118,18 +110,16 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler) return; } + // This should really be streaming the data rather than just + // reading it into memory, though we don't tend to get very + // large logs so it may not matter + const uint64_t LogDataSize = LogEntryCount * m_RecordSize; std::vector<uint8_t> ReadBuffer; ReadBuffer.resize(LogDataSize); - m_File.Seek(LogBaseOffset, FILE_BEGIN); - HRESULT hRes = m_File.Read(ReadBuffer.data(), gsl::narrow<DWORD>(LogDataSize)); - - if (FAILED(hRes)) - { - ThrowSystemException(hRes, "Failed to read log file"); - } + m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset); for (int i = 0; i < LogEntryCount; ++i) { @@ -140,11 +130,16 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler) void CasLogFile::Append(const void* DataPointer, uint64_t DataSize) { - HRESULT hRes = m_File.Write(DataPointer, gsl::narrow<DWORD>(DataSize)); + ZEN_ASSERT(DataSize == m_RecordSize); + + uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize); + + std::error_code Ec; + m_File.Write(DataPointer, gsl::narrow<DWORD>(DataSize), AppendOffset, Ec); - if (FAILED(hRes)) + if (Ec) { - ThrowSystemException(hRes, "Failed to write to log file '{}'"_format(PathFromHandle(m_File))); + throw std::system_error(Ec, "Failed to write to log file '{}'"_format(PathFromHandle(m_File.Handle()))); } } diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 08a3192ff..df5c32d25 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -45,11 +45,23 @@ struct CidStore::Impl ZEN_ASSERT(Compressed != IoHash::Zero); RwLock::ExclusiveLockScope _(m_Lock); - m_CidMap.insert_or_assign(DecompressedId, Compressed); - // TODO: it's pretty wasteful to log even idempotent updates - // however we can't simply use the boolean returned by insert_or_assign - // since there's not a 1:1 mapping between compressed and uncompressed - // so if we want a last-write-wins policy then we have to log each update + + auto It = m_CidMap.try_emplace(DecompressedId, Compressed); + if (!It.second) + { + if (It.first.value() != Compressed) + { + It.first.value() = Compressed; + } + else + { + // No point logging an update that won't change anything + return; + } + } + + // It's not ideal to do this while holding the lock in case + // we end up in blocking I/O but that's for later LogMapping(DecompressedId, Compressed); } @@ -68,6 +80,10 @@ struct CidStore::Impl { CompressedHash = It->second; } + else + { + return {}; + } } ZEN_ASSERT(CompressedHash != IoHash::Zero); @@ -84,7 +100,7 @@ struct CidStore::Impl if (It == m_CidMap.end()) { - // Not in map, or tombstone + // Not in map return false; } @@ -171,7 +187,7 @@ struct CidStore::Impl const IoHash& BadHash = It->first; // Log a tombstone record - m_LogFile.Append({.Uncompressed = BadHash, .Compressed = IoHash::Zero}); + LogMapping(BadHash, IoHash::Zero); BadChunks.push_back(BadHash); diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index c036efd35..0b18848d5 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -7,6 +7,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory.h> +#include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/uid.h> @@ -133,6 +134,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); RenameInfo->FileName[FileName.size()] = 0; + auto $ = MakeGuard([&] { Memory::Free(RenameInfo); }); + // Try to move file into place BOOL Success = SetFileInformationByHandle(FileRef.FileHandle, FileRenameInfo, RenameInfo, BufferSize); @@ -175,8 +178,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) Success = SetFileInformationByHandle(FileRef.FileHandle, FileRenameInfo, RenameInfo, BufferSize); } - Memory::Free(RenameInfo); - if (Success) { return CasStore::InsertResult{.New = true}; diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h index d4d65b366..fad4a33e1 100644 --- a/zenstore/include/zenstore/basicfile.h +++ b/zenstore/include/zenstore/basicfile.h @@ -26,6 +26,10 @@ class BasicFile public: BasicFile() = default; ~BasicFile(); + + BasicFile(const BasicFile&) = delete; + BasicFile& operator=(const BasicFile&) = delete; + void Open(std::filesystem::path FileName, bool IsCreate); void Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec); void Close(); @@ -33,15 +37,47 @@ public: void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); void Write(const void* Data, uint64_t Size, uint64_t FileOffset); + void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec); void Flush(); uint64_t FileSize(); - void* Handle() { return m_FileHandle; } IoBuffer ReadAll(); + void WriteAll(IoBuffer Data, std::error_code& Ec); -private: + inline void* Handle() { return m_FileHandle; } + +protected: void* m_FileHandle = nullptr; // This is either null or valid }; +/** + * Simple abstraction for a temporary file + * + * Works like a regular BasicFile but implements a simple mechanism to allow creating + * a temporary file for writing in a directory which may later be moved atomically + * into the intended location after it has been fully written to. + * + */ + +class TemporaryFile : public BasicFile +{ +public: + TemporaryFile() = default; + ~TemporaryFile(); + + TemporaryFile(const TemporaryFile&) = delete; + TemporaryFile& operator=(const TemporaryFile&) = delete; + + void Close(); + void CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec); + void MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec); + const std::filesystem::path& GetPath() const { return m_TempPath; } + +private: + std::filesystem::path m_TempPath; + + using BasicFile::Open; +}; + ZENCORE_API void basicfile_forcelink(); } // namespace zen diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index 1fbda0265..00b987383 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -9,9 +9,9 @@ #include <zencore/thread.h> #include <zencore/uid.h> #include <zencore/windows.h> +#include <zenstore/basicfile.h> #include <zenstore/cas.h> -#include <atlfile.h> #include <functional> namespace zen { @@ -47,10 +47,10 @@ private: static_assert(sizeof(FileHeader) == 64); private: - CAtlFile m_File; - FileHeader m_Header; - size_t m_RecordSize = 1; - uint64_t m_AppendOffset = 0; + BasicFile m_File; + FileHeader m_Header; + size_t m_RecordSize = 1; + std::atomic<uint64_t> m_AppendOffset = 0; }; template<typename T> diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index f4439e083..5f567e7fc 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -26,6 +26,11 @@ class IoBuffer; * to support chunking then a CID may represent a list of chunks which could be concatenated * to form the referenced chunk. * + * It would likely be possible to implement this mapping in a more efficient way if we + * integrate it into the CAS store itself, so we can avoid maintaining copies of large + * hashes in multiple locations. This would also allow us to consolidate commit logs etc + * which would be more resilient than the current split log scheme + * */ class CidStore { |