diff options
| author | Dan Engelbrecht <[email protected]> | 2022-05-06 12:55:04 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2022-05-06 12:55:04 +0200 |
| commit | 76fd97b9d864ab60d06859befdd4a3a3bf4abd97 (patch) | |
| tree | 9aa9e414f5f4bad23db487d77dae95a302585fd3 /zenserver/cache/structuredcachestore.cpp | |
| parent | Initialize upstream apply in background thread (#88) (diff) | |
| download | zen-76fd97b9d864ab60d06859befdd4a3a3bf4abd97.tar.xz zen-76fd97b9d864ab60d06859befdd4a3a3bf4abd97.zip | |
Fix standalone file lock in CacheBucket
Grab sharding lock when deleting files during GC
Don't hold sharding lock when sleeping in back-off due to file contention
Remove unneeded renaming logic when writing standalone cache values
Diffstat (limited to 'zenserver/cache/structuredcachestore.cpp')
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 170 |
1 files changed, 97 insertions, 73 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 2869191fd..163a3f2f2 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -1515,6 +1515,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) Path.Reset(); BuildPath(Path, Key); + fs::path FilePath = Path.ToPath(); { RwLock::SharedLockScope __(m_IndexLock); @@ -1530,8 +1531,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx) ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8()); continue; } - ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); - fs::remove(Path.c_str(), Ec); + __.ReleaseNow(); + + RwLock::ExclusiveLockScope ValueLock(LockForHash(Key)); + if (fs::is_regular_file(FilePath)) + { + ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8()); + fs::remove(FilePath, Ec); + } } if (Ec) @@ -1722,100 +1729,117 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac void ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) { - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - ExtendablePathBuilder<256> DataFilePath; BuildPath(DataFilePath, HashKey); + std::filesystem::path FsPath{DataFilePath.ToPath()}; - TemporaryFile DataFile; - - std::error_code Ec; - DataFile.CreateTemporary(m_BucketDir.c_str(), Ec); + auto UpdateIndex = [&]() { + uint8_t EntryFlags = DiskLocation::kStandaloneFile; - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir)); - } + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } - DataFile.WriteAll(Value.Value, Ec); + RwLock::ExclusiveLockScope _(m_IndexLock); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size()))); - } + DiskLocation Loc(Value.Value.Size(), EntryFlags); + IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - // Move file into place (atomically) + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + m_Index.insert({HashKey, Entry}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + It.value() = Entry; + } - std::filesystem::path FsPath{DataFilePath.ToPath()}; + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + }; - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + std::error_code Ec; + BasicFile DataFile; - if (Ec) + // Happy path - directory structure exists and nobody is busy reading the file { - int RetryCount = 3; - - do + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); + if (!Ec) { - std::filesystem::path ParentPath = FsPath.parent_path(); - CreateDirectories(ParentPath); - - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); - - if (!Ec) - { - break; - } - - std::error_code InnerEc; - const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc); - - if (!InnerEc && ExistingFileSize == Value.Value.Size()) + DataFile.WriteAll(Value.Value, Ec); + if (Ec) { - // Concurrent write of same value? - return; + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to file '{}' in '{}'", + NiceBytes(Value.Value.Size()), + FsPath, + m_BucketDir)); + } } + ValueLock.ReleaseNow(); + UpdateIndex(); + return; + } + } - // Semi arbitrary back-off - zen::Sleep(1000 * RetryCount); - } while (RetryCount--); - + std::filesystem::path ParentPath = FsPath.parent_path(); + if (!std::filesystem::is_directory(ParentPath)) + { + Ec.clear(); + std::filesystem::create_directories(ParentPath, Ec); if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8())); + throw std::system_error( + Ec, + fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); } } - // Update index - - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + // We retry to open the file since it can be held open for read + // This happens if the server gets a Get request for the file or + // if we are busy sending the file upstream + int RetryCount = 3; + do { - EntryFlags |= DiskLocation::kCompressed; - } - - RwLock::ExclusiveLockScope _(m_IndexLock); - - DiskLocation Loc(Value.Value.Size(), EntryFlags); - IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); + Ec.clear(); + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + DataFile.Open(FsPath, BasicFile::Mode::kTruncate, Ec); + if (!Ec) + { + DataFile.WriteAll(Value.Value, Ec); + if (Ec) + { + if (Ec) + { + throw std::system_error(Ec, + fmt::format("Failed to write payload ({} bytes) to file '{}' in '{}'", + NiceBytes(Value.Value.Size()), + FsPath, + m_BucketDir)); + } + } + ValueLock.ReleaseNow(); + UpdateIndex(); + return; + } + ZEN_INFO("Failed writing opening file '{}' for writing, pausing and retrying, reason '{}'", FsPath.string(), Ec.message()); + ValueLock.ReleaseNow(); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, Entry}); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - It.value() = Entry; - } + // Semi arbitrary back-off + zen::Sleep(200 * (4 - RetryCount)); // Sleep at most for a total of 2 seconds + } while (RetryCount--); - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed); + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } void |