diff options
Diffstat (limited to 'zenserver/cache')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 184 |
1 files changed, 94 insertions, 90 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 1d43e9591..05c80c5bf 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1126,13 +1126,11 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen { BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment); - Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location); - if (!ChunkBlock) + OutValue.Value = m_BlockStore.TryGetChunk(Location); + if (!OutValue.Value) { return false; } - - OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size); OutValue.Value.SetContentType(Loc.GetContentType()); return true; @@ -1166,22 +1164,21 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } RwLock::SharedLockScope _(m_IndexLock); - - if (auto It = m_Index.find(HashKey); It != m_Index.end()) + auto It = m_Index.find(HashKey); + if (It == m_Index.end()) + { + return false; + } + IndexEntry& Entry = It.value(); + Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); + DiskLocation Location = Entry.Location; + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) { - IndexEntry& Entry = It.value(); - Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed); - DiskLocation Location = Entry.Location; + // We don't need to hold the index lock when we read a standalone file _.ReleaseNow(); - - if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) - { - return GetStandaloneCacheValue(Location, HashKey, OutValue); - } - return GetInlineCacheValue(Location, OutValue); + return GetStandaloneCacheValue(Location, HashKey, OutValue); } - - return false; + return GetInlineCacheValue(Location, OutValue); } void @@ -1470,14 +1467,13 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) WriteBlockTimeUs += ElapsedUs; WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs); }); + if (m_Index.empty()) { - if (m_Index.empty()) - { - ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); - return; - } - BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName); + return; } + BlockStoreState = m_BlockStore.GetReclaimSnapshotState(); + SaveManifest(); Index = m_Index; @@ -1515,6 +1511,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) Path.Reset(); BuildPath(Path, Key); + fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope __(m_IndexLock); @@ -1530,8 +1527,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - fs::remove(Path.c_str(), Ec); + __.ReleaseNow(); + + RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(FilePath, Ec); + } } if (Ec) @@ -1722,100 +1725,101 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - - ExtendablePathBuilder<256> DataFilePath; - BuildPath(DataFilePath, HashKey); - TemporaryFile DataFile; std::error_code Ec; DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); - if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir)); + throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir)); } DataFile.WriteAll(Value.Value, Ec); - if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size()))); + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'", + NiceBytes(Value.Value.Size()), + DataFile.GetPath().string(), + m_BucketDir)); } - // Move file into place (atomically) - + ExtendablePathBuilder<256> DataFilePath; + BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - - if (Ec) + // We retry to open the file since it can be held open for read. + // This happens if the server processes a Get request for the file or + // if we are busy sending the file upstream + int RetryCount = 3; + do { - int RetryCount = 3; - - do + Ec.clear(); { - std::filesystem::path ParentPath = FsPath.parent_path(); - CreateDirectories(ParentPath); - + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + } - if (!Ec) + if (!Ec) + { + uint8_t EntryFlags = DiskLocation::kStandaloneFile; + + if (Value.Value.GetContentType() == ZenContentType::kCbObject) { - break; + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; } - std::error_code InnerEc; - const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc); + DiskLocation Loc(Value.Value.Size(), EntryFlags); + IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - if (!InnerEc && ExistingFileSize == Value.Value.Size()) + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) { - // Concurrent write of same value? - return; + // Previously unknown object + m_Index.insert({HashKey, Entry}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + It.value() = Entry; } - // Semi arbitrary back-off - zen::Sleep(1000 * RetryCount); - } while (RetryCount--); + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + return; + } - if (Ec) + std::filesystem::path ParentPath = FsPath.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) { - throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8())); + Ec.clear(); + std::filesystem::create_directories(ParentPath, Ec); + if (!Ec) + { + // Retry without sleep + continue; + } + throw std::system_error( + Ec, + fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); } - } - // Update index + ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'", + DataFile.GetPath().string(), + FsPath.string(), + m_BucketDir, + Ec.message()); - uint8_t EntryFlags = DiskLocation::kStandaloneFile; + // Semi arbitrary back-off + zen::Sleep(200 * (4 - RetryCount)); // Sleep at most for a total of 2 seconds + RetryCount--; + } while (RetryCount > 0); - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - RwLock::ExclusiveLockScope _(m_IndexLock); - - DiskLocation Loc(Value.Value.Size(), EntryFlags); - IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, Entry}); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - It.value() = Entry; - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } void @@ -1832,12 +1836,11 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const EntryFlags |= DiskLocation::kCompressed; } - m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](BlockStoreLocation BlockStoreLocation) { + m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags); const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location}; m_SlogFile.Append(DiskIndexEntry); - m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed); - RwLock::ExclusiveLockScope __(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); if (auto It = m_Index.find(HashKey); It != m_Index.end()) { // TODO: should check if write is idempotent and bail out if it is? @@ -1852,6 +1855,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const m_Index.insert({HashKey, {Location, GcClock::TickCount()}}); } }); + m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed); } ////////////////////////////////////////////////////////////////////////// |