diff options
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | src/zen/cmds/admin_cmd.cpp | 32 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 25 | ||||
| -rw-r--r-- | src/zencore/include/zencore/filesystem.h | 4 | ||||
| -rw-r--r-- | src/zencore/include/zencore/windows.h | 1 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 1213 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 56 | ||||
| -rw-r--r-- | src/zenserver/main.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore-test/zenstore-test.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 31 | ||||
| -rw-r--r-- | src/zenstore/caslog.cpp | 26 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 53 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/blockstore.h | 11 | ||||
| -rw-r--r-- | src/zenstore/zenstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/basicfile.cpp | 94 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/basicfile.h | 31 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenutil.h | 6 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 19 |
18 files changed, 1076 insertions, 534 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 486ee0c6d..d819bf28c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ - Improvement: Updated branding to be consistent with current working name ("Unreal Zen Storage Server" etc) - Improvement: GcScheduler will now cancel any running GC when it shuts down. - Current GC is rather limited in *when* it reacts to cancel of GC. GCv2 is more responsive. +- Improvement: Cache metadata snapshot (`*.uidx`/`.zen_manifest` and now `*.meta`) files are read and written in a streaming fashion instead of all-at-once to/from memory like before. This eliminates some spiky memory usage patterns during garbage collection and also improves overall performance considerably. +- Improvement: `zen copy-state` command now utilizes block cloning where possible (i.e on ReFS volumes) for near-instant snapshots ## 0.2.35 - Bugfix: Fix timeout calculation for semtimedop call diff --git a/src/zen/cmds/admin_cmd.cpp b/src/zen/cmds/admin_cmd.cpp index 3422c6880..86a180d54 100644 --- a/src/zen/cmds/admin_cmd.cpp +++ b/src/zen/cmds/admin_cmd.cpp @@ -550,21 +550,9 @@ static void Copy(const std::filesystem::path& Source, const std::filesystem::path& Target) { CreateDirectories(Target.parent_path()); - BasicFile SourceFile; - SourceFile.Open(Source, BasicFile::Mode::kRead); - BasicFile TargetFile; - TargetFile.Open(Target, BasicFile::Mode::kTruncate); - uint64_t Size = SourceFile.FileSize(); - uint64_t Offset = 0; - std::vector<uint8_t> Buffer(Min(size_t(Size), size_t(65536u))); - while (Offset < Size) - { - uint64_t CopyCount = Min<uint64_t>(Size - Offset, size_t(Buffer.size())); - SourceFile.Read(Buffer.data(), CopyCount, Offset); - TargetFile.Write(Buffer.data(), CopyCount, Offset); - Offset += CopyCount; - } - TargetFile.Flush(); + + CopyFileOptions Options; + CopyFile(Source, Target, Options); } static bool @@ -574,8 +562,11 @@ TryCopy(const std::filesystem::path& Source, const std::filesystem::path& Target { return false; } - Copy(Source, Target); - return true; + + CreateDirectories(Target.parent_path()); + + CopyFileOptions Options; + return CopyFile(Source, Target, Options); } int @@ -630,6 +621,8 @@ CopyStateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::filesystem::path BucketName = BucketPath.filename(); std::filesystem::path TargetBucketPath = TargetNamespacePath / BucketName; + // TODO: make these use file naming helpers from cache implementation? + std::filesystem::path ManifestPath = BucketPath / "zen_manifest"; std::filesystem::path TargetManifestPath = TargetBucketPath / "zen_manifest"; if (TryCopy(ManifestPath, TargetManifestPath)) @@ -646,6 +639,11 @@ CopyStateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::filesystem::path IndexPath = BucketPath / IndexName; std::filesystem::path TargetIndexPath = TargetBucketPath / IndexName; TryCopy(IndexPath, TargetIndexPath); + + std::filesystem::path MetaName = fmt::format("{}.{}", BucketName.string(), "meta"); + std::filesystem::path MetaPath = BucketPath / MetaName; + std::filesystem::path TargetMetaPath = TargetBucketPath / MetaName; + TryCopy(MetaPath, TargetMetaPath); } } } diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 85f7690bd..e9b147be5 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -334,11 +334,17 @@ SupportsBlockRefCounting(std::filesystem::path Path) #endif // ZEN_PLATFORM_WINDOWS } -bool +static bool CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath) { #if ZEN_PLATFORM_WINDOWS - windows::Handle FromFile(CreateFileW(FromPath.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr)); + windows::Handle FromFile(CreateFileW(FromPath.c_str(), + GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + nullptr, + OPEN_EXISTING, + 0, + nullptr)); if (FromFile == INVALID_HANDLE_VALUE) { FromFile.Detach(); @@ -402,8 +408,10 @@ CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath) FILE_DISPOSITION_INFO FileDisposition = {TRUE}; if (!SetFileInformationByHandle(TargetFile, FileDispositionInfo, &FileDisposition, sizeof FileDisposition)) { + const DWORD ErrorCode = ::GetLastError(); TargetFile.Close(); DeleteFileW(ToPath.c_str()); + SetLastError(ErrorCode); return false; } @@ -532,6 +540,19 @@ CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath) #endif // ZEN_PLATFORM_WINDOWS } +void +CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options, std::error_code& OutErrorCode) +{ + OutErrorCode.clear(); + + bool Success = CopyFile(FromPath, ToPath, Options); + + if (!Success) + { + OutErrorCode = MakeErrorCodeFromLastError(); + } +} + bool CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options) { diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index 07eb72879..233941479 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -86,6 +86,10 @@ struct CopyFileOptions }; ZENCORE_API bool CopyFile(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options); +ZENCORE_API void CopyFile(std::filesystem::path FromPath, + std::filesystem::path ToPath, + const CopyFileOptions& Options, + std::error_code& OutError); ZENCORE_API void CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const CopyFileOptions& Options); ZENCORE_API bool SupportsBlockRefCounting(std::filesystem::path Path); diff --git a/src/zencore/include/zencore/windows.h b/src/zencore/include/zencore/windows.h index 0943a85ea..14026fef1 100644 --- a/src/zencore/include/zencore/windows.h +++ b/src/zencore/include/zencore/windows.h @@ -24,6 +24,7 @@ struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax erro # include <windows.h> # undef GetObject # undef SendMessage +# undef CopyFile ZEN_THIRD_PARTY_INCLUDES_END diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 6ab3c7746..0767086ce 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -25,12 +25,6 @@ namespace { #pragma pack(push) #pragma pack(1) - // We use this to indicate if a on disk bucket needs wiping - // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scrable the references - // to block items. - // See: https://github.com/EpicGames/zen/pull/299 - static const uint32_t CurrentDiskBucketVersion = 1; - struct CacheBucketIndexHeader { static constexpr uint32_t ExpectedMagic = 0x75696478; // 'uidx'; @@ -48,12 +42,75 @@ namespace { { return XXH32(&Header.Magic, sizeof(CacheBucketIndexHeader) - sizeof(uint32_t), 0xC0C0'BABA); } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (PayloadAlignment == 0) + { + return false; + } + + return true; + } }; static_assert(sizeof(CacheBucketIndexHeader) == 32); + struct BucketMetaHeader + { + static constexpr uint32_t ExpectedMagic = 0x61'74'65'6d; // 'meta'; + static constexpr uint32_t Version1 = 1; + static constexpr uint32_t CurrentVersion = Version1; + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint64_t EntryCount = 0; + uint64_t LogPosition = 0; + uint32_t Padding = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const BucketMetaHeader& Header) + { + return XXH32(&Header.Magic, sizeof(BucketMetaHeader) - sizeof(uint32_t), 0xC0C0'BABA); + } + + bool IsValid() const + { + if (Magic != ExpectedMagic) + { + return false; + } + + if (Checksum != ComputeChecksum(*this)) + { + return false; + } + + if (Padding != 0) + { + return false; + } + + return true; + } + }; + + static_assert(sizeof(BucketMetaHeader) == 32); + #pragma pack(pop) + ////////////////////////////////////////////////////////////////////////// + template<typename T> void Reset(T& V) { @@ -63,15 +120,16 @@ namespace { const char* IndexExtension = ".uidx"; const char* LogExtension = ".slog"; + const char* MetaExtension = ".meta"; std::filesystem::path GetIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { return BucketDir / (BucketName + IndexExtension); } - std::filesystem::path GetTempIndexPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + std::filesystem::path GetMetaPath(const std::filesystem::path& BucketDir, const std::string& BucketName) { - return BucketDir / (BucketName + ".tmp"); + return BucketDir / (BucketName + MetaExtension); } std::filesystem::path GetLogPath(const std::filesystem::path& BucketDir, const std::string& BucketName) @@ -79,6 +137,12 @@ namespace { return BucketDir / (BucketName + LogExtension); } + std::filesystem::path GetManifestPath(const std::filesystem::path& BucketDir, const std::string& BucketName) + { + ZEN_UNUSED(BucketName); + return BucketDir / "zen_manifest"; + } + bool ValidateCacheBucketIndexEntry(const DiskIndexEntry& Entry, std::string& OutReason) { if (Entry.Key == IoHash::Zero) @@ -147,22 +211,430 @@ namespace { } // namespace namespace fs = std::filesystem; +using namespace std::literals; + +class BucketManifestSerializer +{ + using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; + using BucketMetaData = ZenCacheDiskLayer::CacheBucket::BucketMetaData; + + using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; + using BucketPayload = ZenCacheDiskLayer::CacheBucket::BucketPayload; + +public: + // We use this to indicate if a on disk bucket needs wiping + // In version 0.2.5 -> 0.2.11 there was a GC corruption bug that would scramble the references + // to block items. + // See: https://github.com/EpicGames/zen/pull/299 + static inline const uint32_t CurrentDiskBucketVersion = 1; + + bool Open(std::filesystem::path ManifestPath) + { + Manifest = LoadCompactBinaryObject(ManifestPath); + return !!Manifest; + } + + Oid GetBucketId() const { return Manifest["BucketId"sv].AsObjectId(); } + + bool IsCurrentVersion(uint32_t& OutVersion) const + { + OutVersion = Manifest["Version"sv].AsUInt32(0); + return OutVersion == CurrentDiskBucketVersion; + } + + void ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + Oid GenerateNewManifest(std::filesystem::path ManifestPath); + + IoBuffer MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount); + uint64_t GetSidecarSize() const { return m_ManifestEntryCount * sizeof(ManifestData); } + void WriteSidecarFile(const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas); + bool ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads); + + IoBuffer MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas); + + CbObject Manifest; + +private: + CbObject LoadCompactBinaryObject(const fs::path& Path) + { + FileContents Result = ReadFile(Path); + + if (!Result.ErrorCode) + { + IoBuffer Buffer = Result.Flatten(); + if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + { + return zen::LoadCompactBinaryObject(Buffer); + } + } + + return CbObject(); + } + + uint64_t m_ManifestEntryCount = 0; + + struct ManifestData + { + IoHash Key; // 20 + AccessTime Timestamp; // 4 + IoHash RawHash; // 20 + uint32_t Padding_0; // 4 + size_t RawSize; // 8 + uint64_t Padding_1; // 8 + }; + + static_assert(sizeof(ManifestData) == 64); +}; -static CbObject -LoadCompactBinaryObject(const fs::path& Path) +void +BucketManifestSerializer::ParseManifest(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path ManifestPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) { - FileContents Result = ReadFile(Path); + if (Manifest["UsingMetaFile"sv].AsBool()) + { + ReadSidecarFile(Bucket, GetMetaPath(Bucket.m_BucketDir, Bucket.m_BucketName), Index, AccessTimes, Payloads); + + return; + } + + ZEN_TRACE_CPU("Z$::ParseManifest"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + std::vector<PayloadIndex> KeysIndexes; + KeysIndexes.reserve(Count); - if (!Result.ErrorCode) + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + for (CbFieldView& KeyView : KeyArray) { - IoBuffer Buffer = Result.Flatten(); - if (CbValidateError Error = ValidateCompactBinary(Buffer, CbValidateMode::All); Error == CbValidateError::None) + if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) { - return LoadCompactBinaryObject(Buffer); + KeysIndexes.push_back(It.value()); + } + else + { + KeysIndexes.push_back(PayloadIndex()); + } + } + + size_t KeyIndexOffset = 0; + CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); + for (CbFieldView& TimeStampView : TimeStampArray) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) + { + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } + } + + KeyIndexOffset = 0; + CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); + CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); + if (RawHashArray.Num() == RawSizeArray.Num()) + { + auto RawHashIt = RawHashArray.CreateViewIterator(); + auto RawSizeIt = RawSizeArray.CreateViewIterator(); + while (RawHashIt != CbFieldViewIterator()) + { + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + + if (KeyIndex) + { + uint64_t RawSize = RawSizeIt.AsUInt64(); + IoHash RawHash = RawHashIt.AsHash(); + if (RawSize != 0 || RawHash != IoHash::Zero) + { + BucketPayload& Payload = Payloads[KeyIndex]; + Bucket.SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); + } + } + + RawHashIt++; + RawSizeIt++; } } + else + { + ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); + } +} - return CbObject(); +Oid +BucketManifestSerializer::GenerateNewManifest(std::filesystem::path ManifestPath) +{ + const Oid BucketId = Oid::NewOid(); + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Manifest = Writer.Save(); + WriteFile(ManifestPath, Manifest.GetBuffer().AsIoBuffer()); + + return BucketId; +} + +IoBuffer +BucketManifestSerializer::MakeManifest(const Oid& BucketId, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) +{ + using namespace std::literals; + + ZEN_TRACE_CPU("Z$::MakeManifest"); + + size_t ItemCount = Index.size(); + + // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth + // And we don't need to reallocate the underlying buffer in almost every case + const size_t EstimatedSizePerItem = 54u; + const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); + CbObjectWriter Writer(ReserveSize); + + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + + if (!Index.empty()) + { + Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); + Writer.BeginArray("Keys"sv); + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + Writer.AddHash(Key); + } + Writer.EndArray(); + + Writer.BeginArray("Timestamps"sv); + for (auto& Kv : Index) + { + GcClock::Tick AccessTime = AccessTimes[Kv.second]; + Writer.AddInteger(AccessTime); + } + Writer.EndArray(); + + if (!MetaDatas.empty()) + { + Writer.BeginArray("RawHash"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); + } + else + { + Writer.AddHash(IoHash::Zero); + } + } + Writer.EndArray(); + + Writer.BeginArray("RawSize"sv); + for (auto& Kv : Index) + { + const ZenCacheDiskLayer::CacheBucket::BucketPayload& Payload = Payloads[Kv.second]; + if (Payload.MetaData) + { + Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); + } + else + { + Writer.AddInteger(0); + } + } + Writer.EndArray(); + } + } + + Manifest = Writer.Save(); + return Manifest.GetBuffer().AsIoBuffer(); +} + +IoBuffer +BucketManifestSerializer::MakeSidecarManifest(const Oid& BucketId, uint64_t EntryCount) +{ + m_ManifestEntryCount = EntryCount; + + CbObjectWriter Writer; + Writer << "BucketId"sv << BucketId; + Writer << "Version"sv << CurrentDiskBucketVersion; + Writer << "Count"sv << EntryCount; + Writer << "UsingMetaFile"sv << true; + Manifest = Writer.Save(); + + return Manifest.GetBuffer().AsIoBuffer(); +} + +bool +BucketManifestSerializer::ReadSidecarFile(ZenCacheDiskLayer::CacheBucket& Bucket, + std::filesystem::path SidecarPath, + ZenCacheDiskLayer::CacheBucket::IndexMap& Index, + std::vector<AccessTime>& AccessTimes, + std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads) +{ + ZEN_ASSERT(AccessTimes.size() == Payloads.size()); + + std::error_code Ec; + + BasicFile SidecarFile; + SidecarFile.Open(SidecarPath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to open sidecar file '{}'", SidecarPath)); + } + + uint64_t FileSize = SidecarFile.FileSize(); + + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid sidecar file '{}'", SidecarPath); }); + + if (FileSize < sizeof(BucketMetaHeader)) + { + return false; + } + + BasicFileBuffer Sidecar(SidecarFile, 128 * 1024); + + BucketMetaHeader Header; + Sidecar.Read(&Header, sizeof Header, 0); + + if (!Header.IsValid()) + { + return false; + } + + if (Header.Version != BucketMetaHeader::Version1) + { + return false; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(BucketMetaHeader))) / sizeof(ManifestData); + if (Header.EntryCount > ExpectedEntryCount) + { + return false; + } + + InvalidGuard.Dismiss(); + + uint64_t RemainingEntryCount = ExpectedEntryCount; + uint64_t EntryCount = 0; + uint64_t CurrentReadOffset = sizeof(Header); + + while (RemainingEntryCount--) + { + const ManifestData* Entry = Sidecar.MakeView<ManifestData>(CurrentReadOffset); + CurrentReadOffset += sizeof(ManifestData); + + if (auto It = Index.find(Entry->Key); It != Index.end()) + { + PayloadIndex PlIndex = It.value(); + + ZEN_ASSERT(size_t(PlIndex) <= Payloads.size()); + + ZenCacheDiskLayer::CacheBucket::BucketPayload& PayloadEntry = Payloads[PlIndex]; + + AccessTimes[PlIndex] = Entry->Timestamp; + + if (Entry->RawSize && Entry->RawHash != IoHash::Zero) + { + Bucket.SetMetaData(PayloadEntry, BucketMetaData{.RawSize = Entry->RawSize, .RawHash = Entry->RawHash}); + } + } + + EntryCount++; + } + + ZEN_ASSERT(EntryCount == ExpectedEntryCount); + + return true; +} + +void +BucketManifestSerializer::WriteSidecarFile(const std::filesystem::path& SidecarPath, + uint64_t SnapshotLogPosition, + ZenCacheDiskLayer::CacheBucket::IndexMap&& Index, + std::vector<AccessTime>&& AccessTimes, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketPayload>& Payloads, + const std::vector<ZenCacheDiskLayer::CacheBucket::BucketMetaData>& MetaDatas) +{ + BucketMetaHeader Header; + Header.EntryCount = m_ManifestEntryCount; + Header.LogPosition = SnapshotLogPosition; + Header.Checksum = Header.ComputeChecksum(Header); + + std::error_code Ec; + + TemporaryFile SidecarFile; + SidecarFile.CreateTemporary(SidecarPath.parent_path(), Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed creating '{}'", SidecarFile.GetPath())); + } + + SidecarFile.Write(&Header, sizeof Header, 0); + + // TODO: make this batching for better performance + { + uint64_t WriteOffset = sizeof Header; + + BasicFileWriter SidecarWriter(SidecarFile, 128 * 1024); + + for (auto& Kv : Index) + { + const IoHash& Key = Kv.first; + const PayloadIndex PlIndex = Kv.second; + + IoHash RawHash = IoHash::Zero; + uint64_t RawSize = 0; + + if (const MetaDataIndex MetaIndex = Payloads[PlIndex].MetaData) + { + RawHash = MetaDatas[MetaIndex].RawHash; + RawSize = MetaDatas[MetaIndex].RawSize; + } + + ManifestData ManifestEntry = + {.Key = Key, .Timestamp = AccessTimes[PlIndex], .RawHash = RawHash, .Padding_0 = 0, .RawSize = RawSize, .Padding_1 = 0}; + + SidecarWriter.Write(&ManifestEntry, sizeof ManifestEntry, WriteOffset); + + WriteOffset += sizeof ManifestEntry; + } + } + + SidecarFile.MoveTemporaryIntoPlace(SidecarPath, Ec); + + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to move '{}' into '{}'", SidecarFile.GetPath(), SidecarPath)); + } } ////////////////////////////////////////////////////////////////////////// @@ -207,167 +679,67 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo CreateDirectories(m_BucketDir); - std::filesystem::path ManifestPath{m_BucketDir / "zen_manifest"}; + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); bool IsNew = false; - CbObject Manifest = LoadCompactBinaryObject(ManifestPath); + BucketManifestSerializer ManifestReader; - if (Manifest) + if (ManifestReader.Open(ManifestPath)) { - m_BucketId = Manifest["BucketId"sv].AsObjectId(); + m_BucketId = ManifestReader.GetBucketId(); if (m_BucketId == Oid::Zero) { return false; } - const uint32_t Version = Manifest["Version"sv].AsUInt32(0); - if (Version != CurrentDiskBucketVersion) + + uint32_t Version = 0; + if (ManifestReader.IsCurrentVersion(/* out */ Version) == false) { - ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", BucketDir, Version, CurrentDiskBucketVersion); + ZEN_INFO("Wiping bucket '{}', found version {}, required version {}", + BucketDir, + Version, + BucketManifestSerializer::CurrentDiskBucketVersion); IsNew = true; } } else if (AllowCreate) { - m_BucketId.Generate(); - - CbObjectWriter Writer; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; - Manifest = Writer.Save(); - WriteFile(m_BucketDir / "zen_manifest", Manifest.GetBuffer().AsIoBuffer()); - IsNew = true; + m_BucketId = ManifestReader.GenerateNewManifest(ManifestPath); + IsNew = true; } else { return false; } - OpenLog(IsNew); + InitializeIndexFromDisk(IsNew); - if (!IsNew) + if (IsNew) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenOrCreate::Manifest"); - - Stopwatch Timer; - const auto _ = - MakeGuard([&] { ZEN_INFO("read store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); - if (Count != 0) - { - std::vector<PayloadIndex> KeysIndexes; - KeysIndexes.reserve(Count); - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); - for (CbFieldView& KeyView : KeyArray) - { - if (auto It = m_Index.find(KeyView.AsHash()); It != m_Index.end()) - { - KeysIndexes.push_back(It.value()); - } - else - { - KeysIndexes.push_back(PayloadIndex()); - } - } - size_t KeyIndexOffset = 0; - CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex) - { - m_AccessTimes[KeyIndex] = TimeStampView.AsInt64(); - } - } - KeyIndexOffset = 0; - CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); - CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) - { - auto RawHashIt = RawHashArray.CreateViewIterator(); - auto RawSizeIt = RawSizeArray.CreateViewIterator(); - while (RawHashIt != CbFieldViewIterator()) - { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - - if (KeyIndex) - { - uint64_t RawSize = RawSizeIt.AsUInt64(); - IoHash RawHash = RawHashIt.AsHash(); - if (RawSize != 0 || RawHash != IoHash::Zero) - { - BucketPayload& Payload = m_Payloads[KeyIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - - RawHashIt++; - RawSizeIt++; - } - } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } - } - - ////// Legacy format read - { - for (CbFieldView Entry : Manifest["Timestamps"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_AccessTimes.size()); - m_AccessTimes[EntryIndex] = Obj["LastAccess"sv].AsInt64(); - } - } - for (CbFieldView Entry : Manifest["RawInfo"sv]) - { - const CbObjectView Obj = Entry.AsObjectView(); - const IoHash Key = Obj["Key"sv].AsHash(); - if (auto It = m_Index.find(Key); It != m_Index.end()) - { - size_t EntryIndex = It.value(); - ZEN_ASSERT_SLOW(EntryIndex < m_Payloads.size()); - - const IoHash RawHash = Obj["RawHash"sv].AsHash(); - const uint64_t RawSize = Obj["RawSize"sv].AsUInt64(); - - if (RawHash == IoHash::Zero || RawSize == 0) - { - ZEN_SCOPED_ERROR("detected bad index entry in index - {}", EntryIndex); - } - - BucketPayload& Payload = m_Payloads[EntryIndex]; - SetMetaData(Payload, BucketMetaData{.RawSize = RawSize, .RawHash = RawHash}); - } - } - } + return true; } + ManifestReader.ParseManifest(*this, ManifestPath, m_Index, m_AccessTimes, m_Payloads); + return true; } void -ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeIndexSnapshot"); + ZEN_TRACE_CPU("Z$::Disk::Bucket::WriteIndexSnapshot"); - uint64_t LogCount = m_SlogFile.GetLogCount(); + const uint64_t LogCount = m_SlogFile.GetLogCount(); if (m_LogFlushPosition == LogCount) { return; } ZEN_DEBUG("writing store snapshot for '{}'", m_BucketDir); - uint64_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { + const uint64_t EntryCount = m_Index.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { ZEN_INFO("wrote store snapshot for '{}' containing {} entries in {}", m_BucketDir, EntryCount, @@ -376,42 +748,11 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t() namespace fs = std::filesystem; - fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); - fs::path STmpIndexPath = GetTempIndexPath(m_BucketDir, m_BucketName); - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) - { - ZEN_WARN("snapshot failed to clean up temp snapshot at {}, reason: '{}'", STmpIndexPath, Ec.message()); - return; - } - } + fs::path IndexPath = GetIndexPath(m_BucketDir, m_BucketName); try { - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, STmpIndexPath); - } - - // Write the current state of the location map to a new index state - std::vector<DiskIndexEntry> Entries; - Entries.resize(m_Index.size()); - - { - uint64_t EntryIndex = 0; - for (auto& Entry : m_Index) - { - DiskIndexEntry& IndexEntry = Entries[EntryIndex++]; - IndexEntry.Key = Entry.first; - IndexEntry.Location = m_Payloads[Entry.second].Location; - } - } - - uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + Entries.size() * sizeof(DiskIndexEntry); + const uint64_t IndexSize = sizeof(CacheBucketIndexHeader) + EntryCount * sizeof(DiskIndexEntry); std::error_code Error; DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); if (Error) @@ -431,45 +772,67 @@ ZenCacheDiskLayer::CacheBucket::MakeIndexSnapshot(const std::function<uint64_t() fmt::format("not enough free disk space in '{}' to save index of size {}", m_BucketDir, NiceBytes(IndexSize))); } - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kTruncate); - CacheBucketIndexHeader Header = {.EntryCount = Entries.size(), - .LogPosition = LogCount, - .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + TemporaryFile ObjectIndexFile; + std::error_code Ec; + ObjectIndexFile.CreateTemporary(m_BucketDir, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); + } - Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); - ObjectIndexFile.Write(&Header, sizeof(CacheBucketIndexHeader), 0); - ObjectIndexFile.Write(Entries.data(), Entries.size() * sizeof(DiskIndexEntry), sizeof(CacheBucketIndexHeader)); - ObjectIndexFile.Flush(); - ObjectIndexFile.Close(); - EntryCount = Entries.size(); - m_LogFlushPosition = LogCount; - } - catch (std::exception& Err) - { - ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + { + // This is in a separate scope just to ensure IndexWriter goes out + // of scope before the file is flushed/closed, in order to ensure + // all data is written to the file + BasicFileWriter IndexWriter(ObjectIndexFile, 128 * 1024); + + CacheBucketIndexHeader Header = {.EntryCount = EntryCount, + .LogPosition = LogCount, + .PayloadAlignment = gsl::narrow<uint32_t>(m_Configuration.PayloadAlignment)}; + + Header.Checksum = CacheBucketIndexHeader::ComputeChecksum(Header); + IndexWriter.Write(&Header, sizeof(CacheBucketIndexHeader), 0); + + uint64_t IndexWriteOffset = sizeof(CacheBucketIndexHeader); - // Restore any previous snapshot + for (auto& Entry : m_Index) + { + DiskIndexEntry IndexEntry; + IndexEntry.Key = Entry.first; + IndexEntry.Location = m_Payloads[Entry.second].Location; + IndexWriter.Write(&IndexEntry, sizeof(DiskIndexEntry), IndexWriteOffset); - if (fs::is_regular_file(STmpIndexPath)) + IndexWriteOffset += sizeof(DiskIndexEntry); + } + + IndexWriter.Flush(); + } + + ObjectIndexFile.Flush(); + ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); + if (Ec) { - std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(STmpIndexPath, IndexPath, Ec); - if (Ec) + std::filesystem::path TempFilePath = ObjectIndexFile.GetPath(); + ZEN_WARN("snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", TempFilePath, IndexPath, Ec.message()); + + if (std::filesystem::is_regular_file(TempFilePath)) { - ZEN_WARN("snapshot failed to restore old snapshot from {}, reason: '{}'", STmpIndexPath, Ec.message()); + if (!std::filesystem::remove(TempFilePath, Ec) || Ec) + { + ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", TempFilePath, Ec.message()); + } } } - } - if (fs::is_regular_file(STmpIndexPath)) - { - std::error_code Ec; - if (!fs::remove(STmpIndexPath, Ec) || Ec) + else { - ZEN_WARN("snapshot failed to remove temporary file {}, reason: '{}'", STmpIndexPath, Ec.message()); + // We must only update the log flush position once the snapshot write succeeds + m_LogFlushPosition = LogCount; } } + catch (std::exception& Err) + { + ZEN_WARN("snapshot FAILED, reason: '{}'", Err.what()); + } } uint64_t @@ -477,77 +840,88 @@ ZenCacheDiskLayer::CacheBucket::ReadIndexFile(const std::filesystem::path& Index { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadIndexFile"); - if (std::filesystem::is_regular_file(IndexPath)) + if (!std::filesystem::is_regular_file(IndexPath)) { - BasicFile ObjectIndexFile; - ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); - uint64_t Size = ObjectIndexFile.FileSize(); - if (Size >= sizeof(CacheBucketIndexHeader)) - { - CacheBucketIndexHeader Header; - ObjectIndexFile.Read(&Header, sizeof(Header), 0); - if ((Header.Magic == CacheBucketIndexHeader::ExpectedMagic) && - (Header.Checksum == CacheBucketIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0)) - { - switch (Header.Version) - { - case CacheBucketIndexHeader::Version2: - { - uint64_t ExpectedEntryCount = (Size - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); - if (Header.EntryCount > ExpectedEntryCount) - { - break; - } - size_t EntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", - IndexPath, - EntryCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); + return 0; + } - m_Configuration.PayloadAlignment = Header.PayloadAlignment; + auto InvalidGuard = MakeGuard([&] { ZEN_WARN("skipping invalid index file '{}'", IndexPath); }); - std::vector<DiskIndexEntry> Entries; - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), - Header.EntryCount * sizeof(DiskIndexEntry), - sizeof(CacheBucketIndexHeader)); + BasicFile ObjectIndexFile; + ObjectIndexFile.Open(IndexPath, BasicFile::Mode::kRead); + uint64_t FileSize = ObjectIndexFile.FileSize(); + if (FileSize < sizeof(CacheBucketIndexHeader)) + { + return 0; + } - m_Payloads.reserve(Header.EntryCount); - m_Index.reserve(Header.EntryCount); + CacheBucketIndexHeader Header; + ObjectIndexFile.Read(&Header, sizeof(Header), 0); - std::string InvalidEntryReason; - for (const DiskIndexEntry& Entry : Entries) - { - if (!ValidateCacheBucketIndexEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Entry.Location}); - m_Index.insert_or_assign(Entry.Key, EntryIndex); - EntryCount++; - } - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) - { - m_FirstReferenceIndex.resize(m_Payloads.size()); - } - OutVersion = CacheBucketIndexHeader::Version2; - return Header.LogPosition; - } - break; - default: - break; - } - } + if (!Header.IsValid()) + { + return 0; + } + + if (Header.Version != CacheBucketIndexHeader::Version2) + { + return 0; + } + + const uint64_t ExpectedEntryCount = (FileSize - sizeof(sizeof(CacheBucketIndexHeader))) / sizeof(DiskIndexEntry); + if (Header.EntryCount > ExpectedEntryCount) + { + return 0; + } + + InvalidGuard.Dismiss(); + + size_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + m_Configuration.PayloadAlignment = Header.PayloadAlignment; + + m_Payloads.reserve(Header.EntryCount); + m_Index.reserve(Header.EntryCount); + + BasicFileBuffer FileBuffer(ObjectIndexFile, 128 * 1024); + + uint64_t CurrentReadOffset = sizeof(CacheBucketIndexHeader); + uint64_t RemainingEntryCount = Header.EntryCount; + + std::string InvalidEntryReason; + while (RemainingEntryCount--) + { + const DiskIndexEntry* Entry = FileBuffer.MakeView<DiskIndexEntry>(CurrentReadOffset); + CurrentReadOffset += sizeof(DiskIndexEntry); + + if (!ValidateCacheBucketIndexEntry(*Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; } - ZEN_WARN("skipping invalid index file '{}'", IndexPath); + + const PayloadIndex EntryIndex = PayloadIndex(EntryCount); + m_Payloads.emplace_back(BucketPayload{.Location = Entry->Location}); + m_Index.insert_or_assign(Entry->Key, EntryIndex); + + EntryCount++; } - return 0; + + ZEN_ASSERT(EntryCount == m_Payloads.size()); + + m_AccessTimes.resize(EntryCount, AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(EntryCount); + } + + OutVersion = CacheBucketIndexHeader::Version2; + return Header.LogPosition; } uint64_t @@ -555,61 +929,73 @@ ZenCacheDiskLayer::CacheBucket::ReadLog(const std::filesystem::path& LogPath, ui { ZEN_TRACE_CPU("Z$::Disk::Bucket::ReadLog"); - if (std::filesystem::is_regular_file(LogPath)) + if (!std::filesystem::is_regular_file(LogPath)) { - uint64_t LogEntryCount = 0; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - }); - TCasLogFile<DiskIndexEntry> CasLog; - CasLog.Open(LogPath, CasLogFile::Mode::kRead); - if (CasLog.Initialize()) - { - uint64_t EntryCount = CasLog.GetLogCount(); - if (EntryCount < SkipEntryCount) - { - ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); - SkipEntryCount = 0; - } - LogEntryCount = EntryCount - SkipEntryCount; - uint64_t InvalidEntryCount = 0; - CasLog.Replay( - [&](const DiskIndexEntry& Record) { - std::string InvalidEntryReason; - if (Record.Location.Flags & DiskLocation::kTombStone) - { - m_Index.erase(Record.Key); - return; - } - if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); - ++InvalidEntryCount; - return; - } - PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); - m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); - m_Index.insert_or_assign(Record.Key, EntryIndex); - }, - SkipEntryCount); - m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); - if (m_Configuration.EnableReferenceCaching) + return 0; + } + + uint64_t LogEntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' log containing {} entries in {}", LogPath, LogEntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + TCasLogFile<DiskIndexEntry> CasLog; + CasLog.Open(LogPath, CasLogFile::Mode::kRead); + if (!CasLog.Initialize()) + { + return 0; + } + + const uint64_t EntryCount = CasLog.GetLogCount(); + if (EntryCount < SkipEntryCount) + { + ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); + SkipEntryCount = 0; + } + + LogEntryCount = EntryCount - SkipEntryCount; + uint64_t InvalidEntryCount = 0; + + CasLog.Replay( + [&](const DiskIndexEntry& Record) { + std::string InvalidEntryReason; + if (Record.Location.Flags & DiskLocation::kTombStone) { - m_FirstReferenceIndex.resize(m_Payloads.size()); + // Note: this leaves m_Payloads and other arrays with 'holes' in them + m_Index.erase(Record.Key); + return; } - if (InvalidEntryCount) + + if (!ValidateCacheBucketIndexEntry(Record, InvalidEntryReason)) { - ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", LogPath, InvalidEntryReason); + ++InvalidEntryCount; + return; } - return LogEntryCount; - } + PayloadIndex EntryIndex = PayloadIndex(m_Payloads.size()); + m_Payloads.emplace_back(BucketPayload{.Location = Record.Location}); + m_Index.insert_or_assign(Record.Key, EntryIndex); + }, + SkipEntryCount); + + m_AccessTimes.resize(m_Payloads.size(), AccessTime(GcClock::TickCount())); + + if (m_Configuration.EnableReferenceCaching) + { + m_FirstReferenceIndex.resize(m_Payloads.size()); } - return 0; + + if (InvalidEntryCount) + { + ZEN_WARN("found {} invalid entries in '{}'", InvalidEntryCount, m_BucketDir); + } + + return LogEntryCount; }; void -ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) +ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(const bool IsNew) { ZEN_TRACE_CPU("Z$::Disk::Bucket::OpenLog"); @@ -661,15 +1047,14 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) } else if (fs::is_regular_file(LogPath)) { - ZEN_WARN("removing invalid cas log at '{}'", LogPath); + ZEN_WARN("removing invalid log at '{}'", LogPath); std::filesystem::remove(LogPath); } } m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_Index.size()); + BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { size_t EntryIndex = Entry.second; @@ -679,19 +1064,19 @@ ZenCacheDiskLayer::CacheBucket::OpenLog(const bool IsNew) if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { m_StandaloneSize.fetch_add(Location.Size(), std::memory_order::relaxed); - continue; } - const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); - KnownLocations.push_back(BlockLocation); + else + { + const BlockStoreLocation& BlockLocation = Location.GetBlockLocation(m_Configuration.PayloadAlignment); + KnownBlocks.Add(BlockLocation.BlockIndex); + } } - - m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations); + m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNew || LogEntryCount > 0) { - MakeIndexSnapshot(); + WriteIndexSnapshot(); } - // TODO: should validate integrity of container files here } void @@ -981,20 +1366,7 @@ ZenCacheDiskLayer::CacheBucket::Flush() m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<BucketMetaData> MetaDatas; - IndexMap Index; - - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot(); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas)); + SaveSnapshot(); } catch (std::exception& Ex) { @@ -1003,113 +1375,85 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::CacheBucket::SaveManifest(CbObject&& Manifest, const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc) { - ZEN_TRACE_CPU("Z$::Disk::Bucket::SaveManifest"); try { - IoBuffer Buffer = Manifest.GetBuffer().AsIoBuffer(); - - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); - if (Error) - { - ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); - return; - } - bool EnoughSpace = Space.Free >= Buffer.GetSize() + 1024 * 512; - if (!EnoughSpace) - { - uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); - EnoughSpace = (Space.Free + ReclaimedSpace) >= Buffer.GetSize() + 1024 * 512; - } - if (!EnoughSpace) - { - ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", m_BucketDir, NiceBytes(Buffer.GetSize())); - return; - } - WriteFile(m_BucketDir / "zen_manifest", Buffer); - } - catch (std::exception& Err) - { - ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); - } -} + bool UseLegacyScheme = false; + uint64_t SidecarSize = 0; -CbObject -ZenCacheDiskLayer::CacheBucket::MakeManifest(IndexMap&& Index, - std::vector<AccessTime>&& AccessTimes, - const std::vector<BucketPayload>& Payloads, - const std::vector<BucketMetaData>& MetaDatas) -{ - using namespace std::literals; + IoBuffer Buffer; + BucketManifestSerializer ManifestWriter; - ZEN_TRACE_CPU("Z$::Disk::Bucket::MakeManifest"); - - size_t ItemCount = Index.size(); - - // This tends to overestimate a little bit but it is still way more accurate than what we get with exponential growth - // And we don't need to reallocate theunderying buffer in almost every case - const size_t EstimatedSizePerItem = 54u; - const size_t ReserveSize = ItemCount == 0 ? 48u : RoundUp(32u + (ItemCount * EstimatedSizePerItem), 128); - CbObjectWriter Writer(ReserveSize); + { + std::vector<AccessTime> AccessTimes; + std::vector<BucketPayload> Payloads; + std::vector<BucketMetaData> MetaDatas; + IndexMap Index; - Writer << "BucketId"sv << m_BucketId; - Writer << "Version"sv << CurrentDiskBucketVersion; + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + WriteIndexSnapshot(); + // Note: this copy could be eliminated on shutdown to + // reduce memory usage and execution time + Index = m_Index; + Payloads = m_Payloads; + AccessTimes = m_AccessTimes; + MetaDatas = m_MetaDatas; + } - if (!Index.empty()) - { - Writer.AddInteger("Count"sv, gsl::narrow<std::uint64_t>(Index.size())); - Writer.BeginArray("Keys"sv); - for (auto& Kv : Index) - { - const IoHash& Key = Kv.first; - Writer.AddHash(Key); - } - Writer.EndArray(); + if (UseLegacyScheme) + { + Buffer = ManifestWriter.MakeManifest(m_BucketId, std::move(Index), std::move(AccessTimes), Payloads, MetaDatas); + } + else + { + const uint64_t EntryCount = Index.size(); + Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); + SidecarSize = ManifestWriter.GetSidecarSize(); + } - Writer.BeginArray("Timestamps"sv); - for (auto& Kv : Index) - { - GcClock::Tick AccessTime = AccessTimes[Kv.second]; - Writer.AddInteger(AccessTime); - } - Writer.EndArray(); + const uint64_t RequiredSpace = SidecarSize + Buffer.GetSize() + 1024 * 512; - if (!MetaDatas.empty()) - { - Writer.BeginArray("RawHash"sv); - for (auto& Kv : Index) + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BucketDir, Error); + if (Error) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddHash(MetaDatas[Payload.MetaData].RawHash); - } - else - { - Writer.AddHash(IoHash::Zero); - } + ZEN_WARN("get disk space in '{}' FAILED, reason: '{}'", m_BucketDir, Error.message()); + return; + } + bool EnoughSpace = Space.Free >= RequiredSpace; + if (!EnoughSpace) + { + uint64_t ReclaimedSpace = ClaimDiskReserveFunc(); + EnoughSpace = (Space.Free + ReclaimedSpace) >= RequiredSpace; + } + if (!EnoughSpace) + { + ZEN_WARN("not enough free disk space in '{}'. FAILED to save manifest of size {}", + m_BucketDir, + NiceBytes(Buffer.GetSize())); + return; } - Writer.EndArray(); - Writer.BeginArray("RawSize"sv); - for (auto& Kv : Index) + if (!UseLegacyScheme) { - const BucketPayload& Payload = Payloads[Kv.second]; - if (Payload.MetaData) - { - Writer.AddInteger(MetaDatas[Payload.MetaData].RawSize); - } - else - { - Writer.AddInteger(0); - } + ManifestWriter.WriteSidecarFile(GetMetaPath(m_BucketDir, m_BucketName), + m_LogFlushPosition, + std::move(Index), + std::move(AccessTimes), + Payloads, + MetaDatas); } - Writer.EndArray(); } + + std::filesystem::path ManifestPath = GetManifestPath(m_BucketDir, m_BucketName); + WriteFile(ManifestPath, Buffer); + } + catch (std::exception& Err) + { + ZEN_WARN("writing manifest in '{}' FAILED, reason: '{}'", m_BucketDir, Err.what()); } - return Writer.Save(); } IoHash @@ -1683,20 +2027,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) try { - std::vector<AccessTime> AccessTimes; - std::vector<BucketPayload> Payloads; - std::vector<BucketMetaData> MetaDatas; - IndexMap Index; - { - RwLock::SharedLockScope IndexLock(m_IndexLock); - MakeIndexSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); - Index = m_Index; - Payloads = m_Payloads; - AccessTimes = m_AccessTimes; - MetaDatas = m_MetaDatas; - } - SaveManifest(MakeManifest(std::move(Index), std::move(AccessTimes), Payloads, MetaDatas), - [&]() { return GcCtx.ClaimGCReserve(); }); + SaveSnapshot([&]() { return GcCtx.ClaimGCReserve(); }); } catch (std::exception& Ex) { diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 2986cedf8..8d015d127 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -189,7 +189,6 @@ public: void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS -private: /** A cache bucket manages a single directory containing metadata and data for that bucket */ @@ -225,21 +224,6 @@ private: #endif // ZEN_WITH_TESTS private: - GcManager& m_Gc; - std::atomic_uint64_t& m_OuterCacheMemoryUsage; - std::string m_BucketName; - std::filesystem::path m_BucketDir; - std::filesystem::path m_BlocksBasePath; - BucketConfiguration m_Configuration; - BlockStore m_BlockStore; - Oid m_BucketId; - std::atomic_bool m_IsFlushing{}; - - // These files are used to manage storage of small objects for this bucket - - TCasLogFile<DiskIndexEntry> m_SlogFile; - uint64_t m_LogFlushPosition = 0; - #pragma pack(push) #pragma pack(1) struct MetaDataIndex @@ -309,6 +293,22 @@ private: using IndexMap = tsl::robin_map<IoHash, PayloadIndex, IoHash::Hasher>; + private: + GcManager& m_Gc; + std::atomic_uint64_t& m_OuterCacheMemoryUsage; + std::string m_BucketName; + std::filesystem::path m_BucketDir; + std::filesystem::path m_BlocksBasePath; + BucketConfiguration m_Configuration; + BlockStore m_BlockStore; + Oid m_BucketId; + std::atomic_bool m_IsFlushing{}; + + // These files are used to manage storage of small objects for this bucket + + TCasLogFile<DiskIndexEntry> m_SlogFile; + uint64_t m_LogFlushPosition = 0; + std::atomic<uint64_t> m_DiskHitCount; std::atomic<uint64_t> m_DiskMissCount; std::atomic<uint64_t> m_DiskWriteCount; @@ -342,19 +342,9 @@ private: IoBuffer GetStandaloneCacheValue(ZenContentType ContentType, const IoHash& HashKey) const; void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; - void MakeIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); - uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); - uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); - void OpenLog(const bool IsNew); - CbObject MakeManifest(IndexMap&& Index, - std::vector<AccessTime>&& AccessTimes, - const std::vector<BucketPayload>& Payloads, - const std::vector<BucketMetaData>& MetaDatas); - void SaveManifest( - CbObject&& Manifest, - const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); CacheValueDetails::ValueDetails GetValueDetails(const IoHash& Key, PayloadIndex Index) const; - void CompactReferences(RwLock::ExclusiveLockScope&); + + void CompactReferences(RwLock::ExclusiveLockScope&); void SetReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex, std::span<IoHash> References); void RemoveReferences(RwLock::ExclusiveLockScope&, ReferenceIndex& FirstReferenceIndex); inline bool GetReferences(RwLock::SharedLockScope&, ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const @@ -368,12 +358,20 @@ private: ReferenceIndex AllocateReferenceEntry(RwLock::ExclusiveLockScope&, const IoHash& Key); bool LockedGetReferences(ReferenceIndex FirstReferenceIndex, std::vector<IoHash>& OutReferences) const; void ClearReferenceCache(); + void SetMetaData(BucketPayload& Payload, const ZenCacheDiskLayer::CacheBucket::BucketMetaData& MetaData); void RemoveMetaData(BucketPayload& Payload); BucketMetaData GetMetaData(const BucketPayload& Payload) const; void SetMemCachedData(BucketPayload& Payload, IoBuffer& MemCachedData); size_t RemoveMemCachedData(BucketPayload& Payload); + void InitializeIndexFromDisk(bool IsNew); + uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); + uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); + + void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); + void WriteIndexSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); + void CompactState(std::vector<BucketPayload>& Payloads, std::vector<AccessTime>& AccessTimes, std::vector<BucketMetaData>& MetaDatas, @@ -404,8 +402,10 @@ private: friend class DiskBucketReferenceChecker; friend class DiskBucketStoreCompactor; + friend class BucketManifestSerializer; }; +private: inline void TryMemCacheTrim() { if (m_Configuration.MemCacheTargetFootprintBytes == 0) diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index ff4df183e..6901323e3 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -39,6 +39,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # define ZEN_TEST_WITH_RUNNER 1 # include <zencore/testing.h> +# include <zenutil/zenutil.h> #endif #include <memory> @@ -295,6 +296,7 @@ test_main(int argc, char** argv) zen::zencore_forcelinktests(); zen::zenhttp_forcelinktests(); zen::zenstore_forcelinktests(); + zen::zenutil_forcelinktests(); zen::z$_forcelink(); zen::z$service_forcelink(); diff --git a/src/zenstore-test/zenstore-test.cpp b/src/zenstore-test/zenstore-test.cpp index 00c1136b6..6ef311324 100644 --- a/src/zenstore-test/zenstore-test.cpp +++ b/src/zenstore-test/zenstore-test.cpp @@ -4,6 +4,7 @@ #include <zencore/logging.h> #include <zencore/zencore.h> #include <zenstore/zenstore.h> +#include <zenutil/zenutil.h> #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC # include <sys/time.h> @@ -21,6 +22,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { #if ZEN_WITH_TESTS zen::zenstore_forcelinktests(); + zen::zenutil_forcelinktests(); zen::logging::InitializeLogging(); zen::MaximizeOpenFileCount(); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index e4a66daf4..89774f26d 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -227,7 +227,17 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max } void -BlockStore::SyncExistingBlocksOnDisk(const std::vector<BlockStoreLocation>& KnownLocations) +BlockStore::BlockIndexSet::Add(uint32_t BlockIndex) +{ + if (!std::binary_search(begin(BlockIndexes), end(BlockIndexes), BlockIndex)) + { + auto It = std::lower_bound(begin(BlockIndexes), end(BlockIndexes), BlockIndex); + BlockIndexes.insert(It, BlockIndex); + } +} + +void +BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations) { ZEN_TRACE_CPU("BlockStore::SyncExistingBlocksOnDisk"); @@ -240,14 +250,18 @@ BlockStore::SyncExistingBlocksOnDisk(const std::vector<BlockStoreLocation>& Know { DeleteBlocks.insert(It.first); } - for (const auto& Entry : KnownLocations) + + for (const uint32_t BlockIndex : KnownLocations.GetBlockIndices()) { - DeleteBlocks.erase(Entry.BlockIndex); - if (auto It = m_ChunkBlocks.find(Entry.BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull()) + DeleteBlocks.erase(BlockIndex); + if (auto It = m_ChunkBlocks.find(BlockIndex); It != m_ChunkBlocks.end() && !It->second.IsNull()) { continue; } - MissingBlocks.insert(Entry.BlockIndex); + else + { + MissingBlocks.insert(BlockIndex); + } } for (std::uint32_t BlockIndex : MissingBlocks) { @@ -1473,7 +1487,12 @@ TEST_CASE("blockstore.clean.stray.blocks") CHECK(!ThirdChunk); // Recreate a fake block for a missing chunk location - Store.SyncExistingBlocksOnDisk({FirstChunkLocation, SecondChunkLocation, ThirdChunkLocation}); + BlockStore::BlockIndexSet KnownBlocks; + KnownBlocks.Add(FirstChunkLocation.BlockIndex); + KnownBlocks.Add(SecondChunkLocation.BlockIndex); + KnownBlocks.Add(ThirdChunkLocation.BlockIndex); + Store.SyncExistingBlocksOnDisk(KnownBlocks); + // We create a fake block for the location - we should still not be able to get the chunk CHECK(GetDirectoryContent(RootDirectory / "store", true, false).size() == 2); ThirdChunk = Store.TryGetChunk(ThirdChunkLocation); diff --git a/src/zenstore/caslog.cpp b/src/zenstore/caslog.cpp index c04324fbc..cf3bd76da 100644 --- a/src/zenstore/caslog.cpp +++ b/src/zenstore/caslog.cpp @@ -188,20 +188,30 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler, uint64_t SkipEntr LogBaseOffset += SkipEntryCount * m_RecordSize; LogEntryCount -= SkipEntryCount; - // This should really be streaming the data rather than just - // reading it into memory, though we don't tend to get very - // large logs so it may not matter + const uint64_t LogDataSize = LogEntryCount * m_RecordSize; + uint64_t LogDataRemain = LogDataSize; - const uint64_t LogDataSize = LogEntryCount * m_RecordSize; + const uint64_t MaxBufferSize = 1024 * 1024; std::vector<uint8_t> ReadBuffer; - ReadBuffer.resize(LogDataSize); + ReadBuffer.resize((Min(LogDataSize, MaxBufferSize) / m_RecordSize) * m_RecordSize); - m_File.Read(ReadBuffer.data(), LogDataSize, LogBaseOffset); + uint64_t ReadOffset = 0; - for (int i = 0; i < int(LogEntryCount); ++i) + while (LogDataRemain) { - Handler(ReadBuffer.data() + (i * m_RecordSize)); + const uint64_t BytesToRead = Min(ReadBuffer.size(), LogDataRemain); + const uint64_t EntriesToRead = BytesToRead / m_RecordSize; + + m_File.Read(ReadBuffer.data(), BytesToRead, LogBaseOffset + ReadOffset); + + for (int i = 0; i < int(EntriesToRead); ++i) + { + Handler(ReadBuffer.data() + (i * m_RecordSize)); + } + + LogDataRemain -= BytesToRead; + ReadOffset += BytesToRead; } m_AppendOffset = LogBaseOffset + (m_RecordSize * LogEntryCount); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index f28601771..95198fd59 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -945,10 +945,10 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint { ZEN_TRACE_CPU("CasContainer::ReadIndexFile"); - std::vector<CasDiskIndexEntry> Entries; - Stopwatch Timer; - const auto _ = MakeGuard([&] { - ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, Entries.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + uint64_t EntryCount = 0; + Stopwatch Timer; + const auto _ = MakeGuard([&] { + ZEN_INFO("read store '{}' index containing {} entries in {}", IndexPath, EntryCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); BasicFile ObjectIndexFile; @@ -963,21 +963,38 @@ CasContainerStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint (Header.Checksum == CasDiskIndexHeader::ComputeChecksum(Header)) && (Header.PayloadAlignment > 0) && (Header.EntryCount <= ExpectedEntryCount)) { - Entries.resize(Header.EntryCount); - ObjectIndexFile.Read(Entries.data(), Header.EntryCount * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); m_PayloadAlignment = Header.PayloadAlignment; - std::string InvalidEntryReason; - for (const CasDiskIndexEntry& Entry : Entries) + m_Locations.reserve(ExpectedEntryCount); + m_LocationMap.reserve(ExpectedEntryCount); + + std::vector<CasDiskIndexEntry> Entries; + Entries.resize(128 * 1024 / sizeof(CasDiskIndexEntry)); + + uint64_t RemainingEntries = Header.EntryCount; + + do { - if (!ValidateEntry(Entry, InvalidEntryReason)) + const uint64_t NumToRead = Min(RemainingEntries, Entries.size()); + Entries.resize(NumToRead); + + ObjectIndexFile.Read(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), sizeof(CasDiskIndexHeader)); + + std::string InvalidEntryReason; + for (const CasDiskIndexEntry& Entry : Entries) { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); - continue; + if (!ValidateEntry(Entry, InvalidEntryReason)) + { + ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", IndexPath, InvalidEntryReason); + continue; + } + m_LocationMap[Entry.Key] = m_Locations.size(); + m_Locations.push_back(Entry.Location); + ++EntryCount; } - m_LocationMap[Entry.Key] = m_Locations.size(); - m_Locations.push_back(Entry.Location); - } + + RemainingEntries -= NumToRead; + } while (RemainingEntries); OutVersion = CasDiskIndexHeader::CurrentVersion; return Header.LogPosition; @@ -1097,16 +1114,16 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - std::vector<BlockStoreLocation> KnownLocations; - KnownLocations.reserve(m_LocationMap.size()); + BlockStore::BlockIndexSet KnownBlocks; + for (const auto& Entry : m_LocationMap) { const BlockStoreDiskLocation& DiskLocation = m_Locations[Entry.second]; BlockStoreLocation BlockLocation = DiskLocation.Get(m_PayloadAlignment); - KnownLocations.emplace_back(std::move(BlockLocation)); + KnownBlocks.Add(BlockLocation.BlockIndex); } - m_BlockStore.SyncExistingBlocksOnDisk(KnownLocations); + m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); if (IsNewStore || (LogEntryCount > 0)) { diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index b748fc8f6..82e1c71c6 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -134,9 +134,18 @@ public: void Initialize(const std::filesystem::path& BlocksBasePath, uint64_t MaxBlockSize, uint64_t MaxBlockCount); + struct BlockIndexSet + { + void Add(uint32_t BlockIndex); + std::span<const uint32_t> GetBlockIndices() const { return BlockIndexes; } + + private: + std::vector<uint32_t> BlockIndexes; + }; + // Ask the store to create empty blocks for all locations that does not have a block // Remove any block that is not referenced - void SyncExistingBlocksOnDisk(const std::vector<BlockStoreLocation>& KnownLocations); + void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownLocations); std::vector<uint32_t> GetBlocksToCompact(const std::unordered_map<uint32_t, uint64_t>& BlockUsage, uint32_t BlockUsageThresholdPercent); void Close(); diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp index d87652fde..60dabe31f 100644 --- a/src/zenstore/zenstore.cpp +++ b/src/zenstore/zenstore.cpp @@ -7,7 +7,6 @@ # include <zenstore/blockstore.h> # include <zenstore/gc.h> # include <zenstore/hashkeyset.h> -# include <zenutil/basicfile.h> # include "cas.h" # include "compactcas.h" @@ -18,7 +17,6 @@ namespace zen { void zenstore_forcelinktests() { - basicfile_forcelink(); CAS_forcelink(); filecas_forcelink(); blockstore_forcelink(); diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp index 1dce71e60..7d0edaa5d 100644 --- a/src/zenutil/basicfile.cpp +++ b/src/zenutil/basicfile.cpp @@ -76,16 +76,15 @@ BasicFile::Open(const std::filesystem::path& FileName, Mode InMode, std::error_c const DWORD dwShareMode = FILE_SHARE_READ | (EnumHasAllFlags(InMode, Mode::kPreventWrite) ? 0 : FILE_SHARE_WRITE) | (EnumHasAllFlags(InMode, Mode::kPreventDelete) ? 0 : FILE_SHARE_DELETE); - const DWORD dwFlagsAndAttributes = - FILE_ATTRIBUTE_NORMAL | (EnumHasAllFlags(InMode, Mode::kDeleteOnClose) ? FILE_FLAG_DELETE_ON_CLOSE : 0); - const HANDLE hTemplateFile = nullptr; - const HANDLE FileHandle = CreateFile(FileName.c_str(), - dwDesiredAccess, - dwShareMode, - /* lpSecurityAttributes */ nullptr, - dwCreationDisposition, - dwFlagsAndAttributes, - hTemplateFile); + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; + const HANDLE hTemplateFile = nullptr; + const HANDLE FileHandle = CreateFile(FileName.c_str(), + dwDesiredAccess, + dwShareMode, + /* lpSecurityAttributes */ nullptr, + dwCreationDisposition, + dwFlagsAndAttributes, + hTemplateFile); if (FileHandle == INVALID_HANDLE_VALUE) { @@ -588,6 +587,8 @@ LockFile::Update(CbObject Payload, std::error_code& Ec) BasicFile::Write(Payload.GetBuffer(), 0, Ec); } +////////////////////////////////////////////////////////////////////////// + BasicFileBuffer::BasicFileBuffer(BasicFile& Base, uint64_t BufferSize) : m_Base(Base) , m_Buffer(nullptr) @@ -662,6 +663,79 @@ BasicFileBuffer::MakeView(uint64_t Size, uint64_t FileOffset) return MemoryView(m_Buffer + (FileOffset - m_BufferStart), Size); } +////////////////////////////////////////////////////////////////////////// + +BasicFileWriter::BasicFileWriter(BasicFile& Base, uint64_t BufferSize) +: m_Base(Base) +, m_Buffer(nullptr) +, m_BufferSize(BufferSize) +, m_BufferStart(0) +, m_BufferEnd(0) +{ + m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize); +} + +BasicFileWriter::~BasicFileWriter() +{ + Flush(); + Memory::Free(m_Buffer); +} + +void +BasicFileWriter::Write(void* Data, uint64_t Size, uint64_t FileOffset) +{ + if (m_Buffer == nullptr || (Size >= m_BufferSize)) + { + m_Base.Write(Data, Size, FileOffset); + return; + } + + // Note that this only supports buffering of sequential writes! + + if (FileOffset != m_BufferEnd) + { + Flush(); + m_BufferStart = m_BufferEnd = FileOffset; + } + + while (Size) + { + const uint64_t RemainingBufferCapacity = m_BufferStart + m_BufferSize - m_BufferEnd; + const uint64_t BlockWriteBytes = Min(RemainingBufferCapacity, Size); + const uint64_t BufferWriteOffset = FileOffset - m_BufferStart; + + ZEN_ASSERT_SLOW(BufferWriteOffset < m_BufferSize); + ZEN_ASSERT_SLOW((BufferWriteOffset + BlockWriteBytes) <= m_BufferSize); + + memcpy(m_Buffer + BufferWriteOffset, Data, BlockWriteBytes); + + Size -= BlockWriteBytes; + m_BufferEnd += BlockWriteBytes; + FileOffset += BlockWriteBytes; + + if ((m_BufferEnd - m_BufferStart) == m_BufferSize) + { + Flush(); + } + } +} + +void +BasicFileWriter::Flush() +{ + const uint64_t BufferedBytes = m_BufferEnd - m_BufferStart; + + if (BufferedBytes == 0) + return; + + const uint64_t WriteOffset = m_BufferStart; + m_BufferStart = m_BufferEnd; + + m_Base.Write(m_Buffer, BufferedBytes, WriteOffset); +} + +////////////////////////////////////////////////////////////////////////// + /* ___________ __ \__ ___/___ _______/ |_ ______ diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h index 7797258e8..f25d9f23c 100644 --- a/src/zenutil/include/zenutil/basicfile.h +++ b/src/zenutil/include/zenutil/basicfile.h @@ -44,7 +44,6 @@ public: kModeMask = 0x0007, kPreventDelete = 0x1000'0000, // Do not open with delete sharing mode (prevent other processes from deleting file while open) kPreventWrite = 0x2000'0000, // Do not open with write sharing mode (prevent other processes from writing to file while open) - kDeleteOnClose = 0x4000'0000, // File should be deleted when the last handle is closed }; void Open(const std::filesystem::path& FileName, Mode Mode); @@ -138,6 +137,13 @@ public: void Read(void* Data, uint64_t Size, uint64_t FileOffset); MemoryView MakeView(uint64_t Size, uint64_t FileOffset); + template<typename T> + const T* MakeView(uint64_t FileOffset) + { + MemoryView View = MakeView(sizeof(T), FileOffset); + return reinterpret_cast<const T*>(View.GetData()); + } + private: BasicFile& m_Base; uint8_t* m_Buffer; @@ -147,6 +153,29 @@ private: uint64_t m_BufferEnd; }; +/** Adds a layer of buffered writing to a BasicFile + +This class is not intended for concurrent access, it is not thread safe. + +*/ + +class BasicFileWriter +{ +public: + BasicFileWriter(BasicFile& Base, uint64_t BufferSize); + ~BasicFileWriter(); + + void Write(void* Data, uint64_t Size, uint64_t FileOffset); + void Flush(); + +private: + BasicFile& m_Base; + uint8_t* m_Buffer; + const uint64_t m_BufferSize; + uint64_t m_BufferStart; + uint64_t m_BufferEnd; +}; + ZENCORE_API void basicfile_forcelink(); } // namespace zen diff --git a/src/zenutil/include/zenutil/zenutil.h b/src/zenutil/include/zenutil/zenutil.h index 14d21ea0d..662743de8 100644 --- a/src/zenutil/include/zenutil/zenutil.h +++ b/src/zenutil/include/zenutil/zenutil.h @@ -1,3 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. #pragma once + +namespace zen { + +void zenutil_forcelinktests(); + +} diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp new file mode 100644 index 000000000..df075ea3f --- /dev/null +++ b/src/zenutil/zenutil.cpp @@ -0,0 +1,19 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenutil/zenutil.h" + +#if ZEN_WITH_TESTS + +# include <zenutil/basicfile.h> + +namespace zen { + +void +zenutil_forcelinktests() +{ + basicfile_forcelink(); +} + +} // namespace zen + +#endif |