1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include <zencore/zencore.h>
#include <zencore/filesystem.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/thread.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>
#include "cas.h"
#include <atomic>
#include <functional>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
class BasicFile;
/** CAS storage strategy using a file-per-chunk storage strategy
*/
struct FileCasStrategy final : public GcStorage, public GcReferenceStore
{
FileCasStrategy(GcManager& Gc);
~FileCasStrategy();
void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
CasStore::InsertResult InsertChunk(IoBuffer Chunk,
const IoHash& ChunkHash,
CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(HashKeySet& InOutChunks);
bool IterateChunks(std::span<IoHash> ChunkHashes,
const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
WorkerThreadPool* OptionalWorkerPool);
void Flush();
virtual void ScrubStorage(ScrubContext& ScrubCtx) override;
virtual GcStorageSize StorageSize() const override;
virtual std::string GetGcName(GcCtx& Ctx) override;
virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override;
private:
void MakeIndexSnapshot(bool ResetLog);
uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition);
LoggerRef Log() { return m_Log; }
struct IndexEntry
{
uint64_t Size = 0;
};
using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;
LoggerRef m_Log;
GcManager& m_Gc;
std::filesystem::path m_RootDirectory;
RwLock m_Lock;
IndexMap m_Index;
RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
std::atomic_uint64_t m_TotalSize{};
bool m_IsInitialized = false;
struct FileCasIndexEntry
{
static const uint32_t kTombStone = 0x0000'0001;
bool IsFlagSet(const uint32_t Flag) const { return (Flags & kTombStone) == Flag; }
IoHash Key;
uint32_t Flags = 0;
uint64_t Size = 0;
};
static bool ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason);
static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir);
static_assert(sizeof(FileCasIndexEntry) == 32);
TCasLogFile<FileCasIndexEntry> m_CasLog;
uint64_t m_LogFlushPosition = 0;
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
void IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback);
void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
IoBuffer SafeOpenChunk(const IoHash& ChunkHash, uint64_t ExpectedSize);
struct ShardingHelper
{
ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash);
size_t Shard2len = 0;
ExtendablePathBuilder<128> ShardedPath;
};
bool UpdateIndex(const IoHash& ChunkHash, uint64_t ChunkSize);
friend class FileCasReferencePruner;
friend class FileCasStoreCompactor;
};
void filecas_forcelink();
} // namespace zen
|