aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/filecas.h
blob: e933569275713684ef1ebccdc6304050a6fe2e42 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include <zencore/zencore.h>

#include <zencore/filesystem.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/thread.h>
#include <zenstore/caslog.h>
#include <zenstore/gc.h>

#include "cas.h"

#include <atomic>
#include <functional>

ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
ZEN_THIRD_PARTY_INCLUDES_END

namespace zen {

class BasicFile;

/** CAS storage strategy using a file-per-chunk storage strategy
 */

struct FileCasStrategy final : public GcStorage, public GcReferenceStore
{
	FileCasStrategy(GcManager& Gc);
	~FileCasStrategy();

	void				   Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
	CasStore::InsertResult InsertChunk(IoBuffer				Chunk,
									   const IoHash&		ChunkHash,
									   CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace);
	IoBuffer			   FindChunk(const IoHash& ChunkHash);
	bool				   HaveChunk(const IoHash& ChunkHash);
	void				   FilterChunks(HashKeySet& InOutChunks);
	bool				   IterateChunks(std::span<IoHash>												   ChunkHashes,
										 const std::function<bool(size_t Index, const IoBuffer& Payload)>& AsyncCallback,
										 WorkerThreadPool*												   OptionalWorkerPool);
	void				   Flush();
	virtual void		   ScrubStorage(ScrubContext& ScrubCtx) override;
	virtual GcStorageSize  StorageSize() const override;

	virtual std::string		   GetGcName(GcCtx& Ctx) override;
	virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override;

private:
	void	  MakeIndexSnapshot(bool ResetLog);
	uint64_t  ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
	uint64_t  ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition);
	LoggerRef Log() { return m_Log; }

	struct IndexEntry
	{
		uint64_t Size = 0;
	};
	using IndexMap = tsl::robin_map<IoHash, IndexEntry, IoHash::Hasher>;

	LoggerRef			  m_Log;
	GcManager&			  m_Gc;
	std::filesystem::path m_RootDirectory;
	RwLock				  m_Lock;
	IndexMap			  m_Index;
	RwLock				  m_ShardLocks[256];  // TODO: these should be spaced out so they don't share cache lines
	std::atomic_uint64_t  m_TotalSize{};
	bool				  m_IsInitialized = false;

	struct FileCasIndexEntry
	{
		static const uint32_t kTombStone = 0x0000'0001;

		bool IsFlagSet(const uint32_t Flag) const { return (Flags & kTombStone) == Flag; }

		IoHash	 Key;
		uint32_t Flags = 0;
		uint64_t Size  = 0;
	};
	static bool											   ValidateEntry(const FileCasIndexEntry& Entry, std::string& OutReason);
	static std::vector<FileCasStrategy::FileCasIndexEntry> ScanFolderForCasFiles(const std::filesystem::path& RootDir);

	static_assert(sizeof(FileCasIndexEntry) == 32);

	TCasLogFile<FileCasIndexEntry> m_CasLog;
	uint64_t					   m_LogFlushPosition = 0;

	inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
	void		   IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& Payload)>&& Callback);
	void		   DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
	IoBuffer	   SafeOpenChunk(const IoHash& ChunkHash, uint64_t ExpectedSize);

	struct ShardingHelper
	{
		ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash);

		size_t					   Shard2len = 0;
		ExtendablePathBuilder<128> ShardedPath;
	};

	bool UpdateIndex(const IoHash& ChunkHash, uint64_t ChunkSize);

	friend class FileCasReferencePruner;
	friend class FileCasStoreCompactor;
};

void filecas_forcelink();

}  // namespace zen