aboutsummaryrefslogtreecommitdiff
path: root/zenstore
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-10-11 10:32:13 +0200
committerMartin Ridgers <[email protected]>2021-10-11 10:32:13 +0200
commit735b478c1425cc9a1e407bf917dc8daa4ab16b11 (patch)
tree6e253ee94a5db9244756571084f3f07472e4e2b3 /zenstore
parentFixed undefined mimalloc symbols (diff)
parentiobuffer: Changed MakeFromTemporaryFile so it accepts a path instead of a cha... (diff)
downloadzen-735b478c1425cc9a1e407bf917dc8daa4ab16b11.tar.xz
zen-735b478c1425cc9a1e407bf917dc8daa4ab16b11.zip
Merged main
Diffstat (limited to 'zenstore')
-rw-r--r--zenstore/basicfile.cpp12
-rw-r--r--zenstore/filecas.cpp145
-rw-r--r--zenstore/filecas.h2
-rw-r--r--zenstore/include/zenstore/CAS.h2
-rw-r--r--zenstore/include/zenstore/basicfile.h2
-rw-r--r--zenstore/zenstore.cpp2
6 files changed, 157 insertions, 8 deletions
diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp
index f41f04101..cfc77e2a5 100644
--- a/zenstore/basicfile.cpp
+++ b/zenstore/basicfile.cpp
@@ -143,6 +143,12 @@ BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<voi
}
}
+void
+BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec)
+{
+ Write(Data.GetData(), Data.GetSize(), FileOffset, Ec);
+}
+
void
BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec)
{
@@ -177,6 +183,12 @@ BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::erro
}
void
+BasicFile::Write(MemoryView Data, uint64_t FileOffset)
+{
+ Write(Data.GetData(), Data.GetSize(), FileOffset);
+}
+
+void
BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset)
{
std::error_code Ec;
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index ee641b80a..f69ed6bdb 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -9,12 +9,15 @@
#include <zencore/memory.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
#include <zenstore/basicfile.h>
#include <gsl/gsl-lite.hpp>
+#include <barrier>
#include <filesystem>
#include <functional>
#include <unordered_map>
@@ -86,15 +89,19 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+ const HANDLE ChunkFileHandle = FileRef.FileHandle;
+
auto DeletePayloadFileOnClose = [&] {
// This will cause the file to be deleted when the last handle to it is closed
FILE_DISPOSITION_INFO Fdi{};
Fdi.DeleteFile = TRUE;
- BOOL Success = SetFileInformationByHandle(FileRef.FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+ BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
if (!Success)
{
- ZEN_WARN("Failed to flag temporary payload file for deletion: '{}'", PathFromHandle(FileRef.FileHandle));
+ ZEN_WARN("Failed to flag temporary payload file '{}' for deletion: '{}'",
+ PathFromHandle(ChunkFileHandle),
+ GetLastErrorAsString());
}
};
@@ -118,6 +125,28 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
return CasStore::InsertResult{.New = false};
}
+ else
+ {
+ if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND))
+ {
+ // Shard directory does not exist
+ }
+ else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND))
+ {
+ // Shard directory exists, but not the file
+ }
+ else if (hRes == HRESULT_FROM_WIN32(ERROR_SHARING_VIOLATION))
+ {
+ // Sharing violation, likely because we are trying to open a file
+ // which has been renamed on another thread, and the file handle
+ // used to rename it is still open. We handle this case below
+ // instead of here
+ }
+ else
+ {
+ ZEN_INFO("Unexpected error opening file '{}': {}", WideToUtf8(Name.ShardedPath), hRes);
+ }
+ }
}
std::filesystem::path FullPath(Name.ShardedPath.c_str());
@@ -138,7 +167,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
// Try to move file into place
- BOOL Success = SetFileInformationByHandle(FileRef.FileHandle, FileRenameInfo, RenameInfo, BufferSize);
+ BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
if (!Success)
{
@@ -150,7 +179,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
auto InternalCreateDirectoryHandle = [&] {
return DirHandle.Create(FilePath.c_str(),
GENERIC_READ | GENERIC_WRITE,
- FILE_SHARE_READ | FILE_SHARE_WRITE,
+ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS);
};
@@ -163,6 +192,9 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
if (FAILED(hRes))
{
+ // TODO: we can handle directory creation more intelligently and efficiently than
+ // this currently does
+
CreateDirectories(FilePath.c_str());
hRes = InternalCreateDirectoryHandle();
@@ -173,9 +205,9 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
ThrowSystemException(hRes, "Failed to open shard directory '{}'"_format(FilePath));
}
- // Retry
+ // Retry rename/move
- Success = SetFileInformationByHandle(FileRef.FileHandle, FileRenameInfo, RenameInfo, BufferSize);
+ Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize);
}
if (Success)
@@ -183,8 +215,17 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
return CasStore::InsertResult{.New = true};
}
+ const DWORD LastError = GetLastError();
+
+ if ((LastError == ERROR_FILE_EXISTS) || (LastError == ERROR_ALREADY_EXISTS))
+ {
+ DeletePayloadFileOnClose();
+
+ return CasStore::InsertResult{.New = false};
+ }
+
ZEN_WARN("rename of CAS payload file failed ('{}'), falling back to regular write for insert of {}",
- GetLastErrorAsString(),
+ GetSystemErrorAsString(LastError),
ChunkHash);
DeletePayloadFileOnClose();
@@ -445,4 +486,94 @@ FileCasStrategy::GarbageCollect(GcContext& GcCtx)
ZEN_UNUSED(GcCtx);
}
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("cas.file.move")
+{
+ using namespace fmt::literals;
+
+ ScopedTemporaryDirectory TempDir{"d:\\filecas_testdir"};
+
+ CasStoreConfiguration CasConfig;
+ CasConfig.RootDirectory = TempDir.Path() / "cas";
+
+ FileCasStrategy FileCas(CasConfig);
+
+ {
+ std::filesystem::path Payload1Path{TempDir.Path() / "payload_1"};
+
+ IoBuffer ZeroBytes{1024 * 1024};
+ IoHash ZeroHash = IoHash::HashBuffer(ZeroBytes);
+
+ BasicFile PayloadFile;
+ PayloadFile.Open(Payload1Path, true);
+ PayloadFile.Write(ZeroBytes, 0);
+ PayloadFile.Close();
+
+ IoBuffer Payload1 = IoBufferBuilder::MakeFromTemporaryFile(Payload1Path);
+
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload1, ZeroHash);
+ CHECK_EQ(Result.New, true);
+ }
+
+# if 0
+ SUBCASE("stresstest")
+ {
+ std::vector<IoHash> PayloadHashes;
+
+ const int kWorkers = 64;
+ const int kItemCount = 128;
+
+ for (int w = 0; w < kWorkers; ++w)
+ {
+ for (int i = 0; i < kItemCount; ++i)
+ {
+ IoBuffer Payload{1024};
+ *reinterpret_cast<int*>(Payload.MutableData()) = i;
+ PayloadHashes.push_back(IoHash::HashBuffer(Payload));
+
+ std::filesystem::path PayloadPath{TempDir.Path() / "payload_{}_{}"_format(w, i)};
+ WriteFile(PayloadPath, Payload);
+ }
+ }
+
+ std::barrier Sync{kWorkers};
+
+ auto PopulateAll = [&](int w) {
+ std::vector<IoBuffer> Buffers;
+
+ for (int i = 0; i < kItemCount; ++i)
+ {
+ std::filesystem::path PayloadPath{TempDir.Path() / "payload_{}_{}"_format(w, i)};
+ IoBuffer Payload = IoBufferBuilder::MakeFromTemporaryFile(PayloadPath);
+ Buffers.push_back(Payload);
+ Sync.arrive_and_wait();
+ CasStore::InsertResult Result = FileCas.InsertChunk(Payload, PayloadHashes[i]);
+ }
+ };
+
+ std::vector<std::jthread> Threads;
+
+ for (int i = 0; i < kWorkers; ++i)
+ {
+ Threads.push_back(std::jthread(PopulateAll, i));
+ }
+
+ for (std::jthread& Thread : Threads)
+ {
+ Thread.join();
+ }
+ }
+# endif
+}
+
+#endif
+
+void
+filecas_forcelink()
+{
+}
+
} // namespace zen
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index db21502c6..14314ce52 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -57,4 +57,6 @@ private:
};
};
+void filecas_forcelink();
+
} // namespace zen
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index d0698df7f..86e7e78d9 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -21,7 +21,7 @@ namespace zen {
struct CasStoreConfiguration
{
- // Root directory for CAS store -- if not specified a default folder will be assigned in 'Documents\zen'
+ // Root directory for CAS store
std::filesystem::path RootDirectory;
// Threshold below which values are considered 'tiny' and managed using the 'tiny values' strategy
diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h
index fad4a33e1..7ae35dea6 100644
--- a/zenstore/include/zenstore/basicfile.h
+++ b/zenstore/include/zenstore/basicfile.h
@@ -36,6 +36,8 @@ public:
void Read(void* Data, uint64_t Size, uint64_t FileOffset);
void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
+ void Write(MemoryView Data, uint64_t FileOffset);
+ void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec);
void Flush();
diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp
index cd16e5634..d852fa64b 100644
--- a/zenstore/zenstore.cpp
+++ b/zenstore/zenstore.cpp
@@ -4,6 +4,7 @@
#include <zenstore/CAS.h>
#include <zenstore/basicfile.h>
+#include "filecas.h"
namespace zen {
@@ -12,6 +13,7 @@ zenstore_forcelinktests()
{
basicfile_forcelink();
CAS_forcelink();
+ filecas_forcelink();
}
} // namespace zen