aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/CAS.cpp2
-rw-r--r--zenstore/basicfile.cpp194
-rw-r--r--zenstore/caslog.cpp51
-rw-r--r--zenstore/cidstore.cpp30
-rw-r--r--zenstore/filecas.cpp5
-rw-r--r--zenstore/include/zenstore/basicfile.h40
-rw-r--r--zenstore/include/zenstore/caslog.h10
-rw-r--r--zenstore/include/zenstore/cidstore.h5
8 files changed, 269 insertions, 68 deletions
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index eaf72cb41..1db2b50bf 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -50,7 +50,7 @@ CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& P
void
CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback)
{
- for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
+ for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd; ++It)
{
Callback(*It);
}
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index fe54184cf..f41f04101 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -35,12 +35,19 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate)
void
BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec)
{
+ Ec.clear();
+
const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING;
- const DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
+ DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
const DWORD dwShareMode = FILE_SHARE_READ;
const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
HANDLE hTemplateFile = nullptr;
+ if (IsCreate)
+ {
+ dwDesiredAccess |= DELETE;
+ }
+
HANDLE FileHandle = CreateFile(FileName.c_str(),
dwDesiredAccess,
dwShareMode,
@@ -52,6 +59,8 @@ BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code&
if (FileHandle == INVALID_HANDLE_VALUE)
{
Ec = zen::MakeErrorCodeFromLastError();
+
+ return;
}
m_FileHandle = FileHandle;
@@ -63,25 +72,37 @@ BasicFile::Close()
if (m_FileHandle)
{
::CloseHandle(m_FileHandle);
+ m_FileHandle = nullptr;
}
}
void
-BasicFile::Read(void* Data, uint64_t Size, uint64_t Offset)
+BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset)
{
- OVERLAPPED Ovl{};
+ const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
- Ovl.Offset = DWORD(Offset & 0xffff'ffffu);
- Ovl.OffsetHigh = DWORD(Offset >> 32);
+ while (BytesToRead)
+ {
+ const uint64_t NumberOfBytesToRead = Min(BytesToRead, MaxChunkSize);
- DWORD dwNumberOfBytesToRead = gsl::narrow<DWORD>(Size);
- DWORD dwNumberOfBytesRead = 0;
+ OVERLAPPED Ovl{};
- BOOL Success = ::ReadFile(m_FileHandle, Data, dwNumberOfBytesToRead, &dwNumberOfBytesRead, &Ovl);
+ Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
+ Ovl.OffsetHigh = DWORD(FileOffset >> 32);
- if (!Success)
- {
- ThrowLastError("Failed to read from file '{}'"_format(zen::PathFromHandle(m_FileHandle)));
+ DWORD dwNumberOfBytesRead = 0;
+ BOOL Success = ::ReadFile(m_FileHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl);
+
+ ZEN_ASSERT(dwNumberOfBytesRead == NumberOfBytesToRead);
+
+ if (!Success)
+ {
+ ThrowLastError("Failed to read from file '{}'"_format(zen::PathFromHandle(m_FileHandle)));
+ }
+
+ BytesToRead -= NumberOfBytesToRead;
+ FileOffset += NumberOfBytesToRead;
+ Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead;
}
}
@@ -89,9 +110,7 @@ IoBuffer
BasicFile::ReadAll()
{
IoBuffer Buffer(FileSize());
-
- Read((void*)Buffer.Data(), Buffer.Size(), 0);
-
+ Read(Buffer.MutableData(), Buffer.Size(), 0);
return Buffer;
}
@@ -125,25 +144,57 @@ BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<voi
}
void
-BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset)
+BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec)
{
- OVERLAPPED Ovl{};
+ Ec.clear();
+
+ const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
+
+ while (Size)
+ {
+ const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize);
+
+ OVERLAPPED Ovl{};
+
+ Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
+ Ovl.OffsetHigh = DWORD(FileOffset >> 32);
+
+ DWORD dwNumberOfBytesWritten = 0;
- Ovl.Offset = DWORD(Offset & 0xffff'ffffu);
- Ovl.OffsetHigh = DWORD(Offset >> 32);
+ BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl);
- DWORD dwNumberOfBytesToWrite = gsl::narrow<DWORD>(Size);
- DWORD dwNumberOfBytesWritten = 0;
+ if (!Success)
+ {
+ Ec = MakeErrorCodeFromLastError();
- BOOL Success = ::WriteFile(m_FileHandle, Data, dwNumberOfBytesToWrite, &dwNumberOfBytesWritten, &Ovl);
+ return;
+ }
- if (!Success)
+ Size -= NumberOfBytesToWrite;
+ FileOffset += NumberOfBytesToWrite;
+ Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite;
+ }
+}
+
+void
+BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset)
+{
+ std::error_code Ec;
+ Write(Data, Size, Offset, Ec);
+
+ if (Ec)
{
- ThrowLastError("Failed to write to file '{}'"_format(zen::PathFromHandle(m_FileHandle)));
+ throw std::system_error(Ec, "Failed to write to file '{}'"_format(zen::PathFromHandle(m_FileHandle)));
}
}
void
+BasicFile::WriteAll(IoBuffer Data, std::error_code& Ec)
+{
+ Write(Data.Data(), Data.Size(), 0, Ec);
+}
+
+void
BasicFile::Flush()
{
FlushFileBuffers(m_FileHandle);
@@ -158,6 +209,60 @@ BasicFile::FileSize()
return uint64_t(liFileSize.QuadPart);
}
+//////////////////////////////////////////////////////////////////////////
+
+TemporaryFile::~TemporaryFile()
+{
+ Close();
+}
+
+void
+TemporaryFile::Close()
+{
+ if (m_FileHandle)
+ {
+ // Mark file for deletion when final handle is closed
+
+ FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE};
+
+ SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+
+ BasicFile::Close();
+ }
+}
+
+void
+TemporaryFile::CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec)
+{
+ StringBuilder<64> TempName;
+ Oid::NewOid().ToString(TempName);
+
+ m_TempPath = TempDirName / TempName.c_str();
+
+ const bool IsCreate = true;
+
+ Open(m_TempPath, IsCreate, Ec);
+}
+
+void
+TemporaryFile::MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec)
+{
+ // We intentionally call the base class Close() since otherwise we'll end up
+ // deleting the temporary file
+ BasicFile::Close();
+
+ std::filesystem::rename(m_TempPath, FinalFileName, Ec);
+}
+
+/*
+ ___________ __
+ \__ ___/___ _______/ |_ ______
+ | |_/ __ \ / ___/\ __\/ ___/
+ | |\ ___/ \___ \ | | \___ \
+ |____| \___ >____ > |__| /____ >
+ \/ \/ \/
+*/
+
#if ZEN_WITH_TESTS
TEST_CASE("BasicFile")
@@ -169,6 +274,49 @@ TEST_CASE("BasicFile")
CHECK_NOTHROW(File1.Open("zonk", true));
CHECK_NOTHROW(File1.Write("abcd", 4, 0));
CHECK(File1.FileSize() == 4);
+ {
+ IoBuffer Data = File1.ReadAll();
+ CHECK(Data.Size() == 4);
+ CHECK_EQ(memcmp(Data.Data(), "abcd", 4), 0);
+ }
+ CHECK_NOTHROW(File1.Write("efgh", 4, 2));
+ CHECK(File1.FileSize() == 6);
+ {
+ IoBuffer Data = File1.ReadAll();
+ CHECK(Data.Size() == 6);
+ CHECK_EQ(memcmp(Data.Data(), "abefgh", 6), 0);
+ }
+}
+
+TEST_CASE("TemporaryFile")
+{
+ ScopedCurrentDirectoryChange _;
+
+ SUBCASE("DeleteOnClose")
+ {
+ TemporaryFile TmpFile;
+ std::error_code Ec;
+ TmpFile.CreateTemporary(std::filesystem::current_path(), Ec);
+ CHECK(!Ec);
+ CHECK(std::filesystem::exists(TmpFile.GetPath()));
+ TmpFile.Close();
+ CHECK(std::filesystem::exists(TmpFile.GetPath()) == false);
+ }
+
+ SUBCASE("MoveIntoPlace")
+ {
+ TemporaryFile TmpFile;
+ std::error_code Ec;
+ TmpFile.CreateTemporary(std::filesystem::current_path(), Ec);
+ CHECK(!Ec);
+ std::filesystem::path TempPath = TmpFile.GetPath();
+ std::filesystem::path FinalPath = std::filesystem::current_path() / "final";
+ CHECK(std::filesystem::exists(TempPath));
+ TmpFile.MoveTemporaryIntoPlace(FinalPath, Ec);
+ CHECK(!Ec);
+ CHECK(std::filesystem::exists(TempPath) == false);
+ CHECK(std::filesystem::exists(FinalPath));
+ }
}
void
diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp
index dc6021544..2bac6affd 100644
--- a/zenstore/caslog.cpp
+++ b/zenstore/caslog.cpp
@@ -17,11 +17,8 @@
#include <gsl/gsl-lite.hpp>
-#include <functional>
-
-struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax error: 'identifier' was unexpected here" when using /permissive-
-#include <atlfile.h>
#include <filesystem>
+#include <functional>
//////////////////////////////////////////////////////////////////////////
@@ -48,13 +45,12 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
{
m_RecordSize = RecordSize;
- const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING;
+ std::error_code Ec;
+ m_File.Open(FileName, IsCreate);
- HRESULT hRes = m_File.Create(FileName.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, dwCreationDisposition);
-
- if (FAILED(hRes))
+ if (Ec)
{
- throw std::system_error(GetLastError(), std::system_category(), "Failed to open log file '{}'"_format(FileName));
+ throw std::system_error(Ec, "Failed to open log file '{}'"_format(FileName));
}
uint64_t AppendOffset = 0;
@@ -66,7 +62,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic);
Header.Finalize();
- m_File.Write(&Header, sizeof Header);
+ m_File.Write(&Header, sizeof Header, 0);
AppendOffset = sizeof(FileHeader);
@@ -76,7 +72,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
{
// Validate header and log contents and prepare for appending/replay
FileHeader Header;
- m_File.Read(&Header, sizeof Header);
+ m_File.Read(&Header, sizeof Header, 0);
if ((0 != memcmp(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic)) || (Header.Checksum != Header.ComputeChecksum()))
{
@@ -84,11 +80,8 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat
throw std::runtime_error("Mangled log header");
}
- ULONGLONG Sz;
- m_File.GetSize(Sz);
- AppendOffset = Sz;
-
- m_Header = Header;
+ AppendOffset = m_File.FileSize();
+ m_Header = Header;
}
m_AppendOffset = AppendOffset;
@@ -106,8 +99,7 @@ CasLogFile::Close()
void
CasLogFile::Replay(std::function<void(const void*)>&& Handler)
{
- ULONGLONG LogFileSize;
- m_File.GetSize(LogFileSize);
+ uint64_t LogFileSize = m_File.FileSize();
// Ensure we end up on a clean boundary
const uint64_t LogBaseOffset = sizeof(FileHeader);
@@ -118,18 +110,16 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler)
return;
}
+ // This should really be streaming the data rather than just
+ // reading it into memory, though we don't tend to get very
+ // large logs so it may not matter
+
const uint64_t LogDataSize = LogEntryCount * m_RecordSize;
std::vector<uint8_t> ReadBuffer;
ReadBuffer.resize(LogDataSize);
- m_File.Seek(LogBaseOffset, FILE_BEGIN);
- HRESULT hRes = m_File.Read(ReadBuffer.data(), gsl::narrow<DWORD>(LogDataSize));
-
- if (FAILED(hRes))
- {
- ThrowSystemException(hRes, "Failed to read log file");
- }
+ m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset);
for (int i = 0; i < LogEntryCount; ++i)
{
@@ -140,11 +130,16 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler)
void
CasLogFile::Append(const void* DataPointer, uint64_t DataSize)
{
- HRESULT hRes = m_File.Write(DataPointer, gsl::narrow<DWORD>(DataSize));
+ ZEN_ASSERT(DataSize == m_RecordSize);
+
+ uint64_t AppendOffset = m_AppendOffset.fetch_add(DataSize);
+
+ std::error_code Ec;
+ m_File.Write(DataPointer, gsl::narrow<DWORD>(DataSize), AppendOffset, Ec);
- if (FAILED(hRes))
+ if (Ec)
{
- ThrowSystemException(hRes, "Failed to write to log file '{}'"_format(PathFromHandle(m_File)));
+ throw std::system_error(Ec, "Failed to write to log file '{}'"_format(PathFromHandle(m_File.Handle())));
}
}
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 08a3192ff..df5c32d25 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -45,11 +45,23 @@ struct CidStore::Impl
ZEN_ASSERT(Compressed != IoHash::Zero);
RwLock::ExclusiveLockScope _(m_Lock);
- m_CidMap.insert_or_assign(DecompressedId, Compressed);
- // TODO: it's pretty wasteful to log even idempotent updates
- // however we can't simply use the boolean returned by insert_or_assign
- // since there's not a 1:1 mapping between compressed and uncompressed
- // so if we want a last-write-wins policy then we have to log each update
+
+ auto It = m_CidMap.try_emplace(DecompressedId, Compressed);
+ if (!It.second)
+ {
+ if (It.first.value() != Compressed)
+ {
+ It.first.value() = Compressed;
+ }
+ else
+ {
+ // No point logging an update that won't change anything
+ return;
+ }
+ }
+
+ // It's not ideal to do this while holding the lock in case
+ // we end up in blocking I/O but that's for later
LogMapping(DecompressedId, Compressed);
}
@@ -68,6 +80,10 @@ struct CidStore::Impl
{
CompressedHash = It->second;
}
+ else
+ {
+ return {};
+ }
}
ZEN_ASSERT(CompressedHash != IoHash::Zero);
@@ -84,7 +100,7 @@ struct CidStore::Impl
if (It == m_CidMap.end())
{
- // Not in map, or tombstone
+ // Not in map
return false;
}
@@ -171,7 +187,7 @@ struct CidStore::Impl
const IoHash& BadHash = It->first;
// Log a tombstone record
- m_LogFile.Append({.Uncompressed = BadHash, .Compressed = IoHash::Zero});
+ LogMapping(BadHash, IoHash::Zero);
BadChunks.push_back(BadHash);
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index c036efd35..0b18848d5 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -7,6 +7,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
+#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
@@ -133,6 +134,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR));
RenameInfo->FileName[FileName.size()] = 0;
+ auto $ = MakeGuard([&] { Memory::Free(RenameInfo); });
+
// Try to move file into place
BOOL Success = SetFileInformationByHandle(FileRef.FileHandle, FileRenameInfo, RenameInfo, BufferSize);
@@ -175,8 +178,6 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
Success = SetFileInformationByHandle(FileRef.FileHandle, FileRenameInfo, RenameInfo, BufferSize);
}
- Memory::Free(RenameInfo);
-
if (Success)
{
return CasStore::InsertResult{.New = true};
diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h
index d4d65b366..fad4a33e1 100644
--- a/zenstore/include/zenstore/basicfile.h
+++ b/zenstore/include/zenstore/basicfile.h
@@ -26,6 +26,10 @@ class BasicFile
public:
BasicFile() = default;
~BasicFile();
+
+ BasicFile(const BasicFile&) = delete;
+ BasicFile& operator=(const BasicFile&) = delete;
+
void Open(std::filesystem::path FileName, bool IsCreate);
void Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec);
void Close();
@@ -33,15 +37,47 @@ public:
void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
+ void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec);
void Flush();
uint64_t FileSize();
- void* Handle() { return m_FileHandle; }
IoBuffer ReadAll();
+ void WriteAll(IoBuffer Data, std::error_code& Ec);
-private:
+ inline void* Handle() { return m_FileHandle; }
+
+protected:
void* m_FileHandle = nullptr; // This is either null or valid
};
+/**
+ * Simple abstraction for a temporary file
+ *
+ * Works like a regular BasicFile but implements a simple mechanism to allow creating
+ * a temporary file for writing in a directory which may later be moved atomically
+ * into the intended location after it has been fully written to.
+ *
+ */
+
+class TemporaryFile : public BasicFile
+{
+public:
+ TemporaryFile() = default;
+ ~TemporaryFile();
+
+ TemporaryFile(const TemporaryFile&) = delete;
+ TemporaryFile& operator=(const TemporaryFile&) = delete;
+
+ void Close();
+ void CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec);
+ void MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec);
+ const std::filesystem::path& GetPath() const { return m_TempPath; }
+
+private:
+ std::filesystem::path m_TempPath;
+
+ using BasicFile::Open;
+};
+
ZENCORE_API void basicfile_forcelink();
} // namespace zen
diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h
index 1fbda0265..00b987383 100644
--- a/zenstore/include/zenstore/caslog.h
+++ b/zenstore/include/zenstore/caslog.h
@@ -9,9 +9,9 @@
#include <zencore/thread.h>
#include <zencore/uid.h>
#include <zencore/windows.h>
+#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
-#include <atlfile.h>
#include <functional>
namespace zen {
@@ -47,10 +47,10 @@ private:
static_assert(sizeof(FileHeader) == 64);
private:
- CAtlFile m_File;
- FileHeader m_Header;
- size_t m_RecordSize = 1;
- uint64_t m_AppendOffset = 0;
+ BasicFile m_File;
+ FileHeader m_Header;
+ size_t m_RecordSize = 1;
+ std::atomic<uint64_t> m_AppendOffset = 0;
};
template<typename T>
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index f4439e083..5f567e7fc 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -26,6 +26,11 @@ class IoBuffer;
* to support chunking then a CID may represent a list of chunks which could be concatenated
* to form the referenced chunk.
*
+ * It would likely be possible to implement this mapping in a more efficient way if we
+ * integrate it into the CAS store itself, so we can avoid maintaining copies of large
+ * hashes in multiple locations. This would also allow us to consolidate commit logs etc
+ * which would be more resilient than the current split log scheme
+ *
*/
class CidStore
{