aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/cache')
-rw-r--r--zenserver/cache/structuredcachestore.cpp184
1 files changed, 94 insertions, 90 deletions
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 1d43e9591..05c80c5bf 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -1126,13 +1126,11 @@ ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, Zen
{
BlockStoreLocation Location = Loc.GetBlockLocation(m_PayloadAlignment);
- Ref<BlockStoreFile> ChunkBlock = m_BlockStore.GetChunkBlock(Location);
- if (!ChunkBlock)
+ OutValue.Value = m_BlockStore.TryGetChunk(Location);
+ if (!OutValue.Value)
{
return false;
}
-
- OutValue.Value = ChunkBlock->GetChunk(Location.Offset, Location.Size);
OutValue.Value.SetContentType(Loc.GetContentType());
return true;
@@ -1166,22 +1164,21 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
RwLock::SharedLockScope _(m_IndexLock);
-
- if (auto It = m_Index.find(HashKey); It != m_Index.end())
+ auto It = m_Index.find(HashKey);
+ if (It == m_Index.end())
+ {
+ return false;
+ }
+ IndexEntry& Entry = It.value();
+ Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
+ DiskLocation Location = Entry.Location;
+ if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
{
- IndexEntry& Entry = It.value();
- Entry.LastAccess.store(GcClock::TickCount(), std::memory_order_relaxed);
- DiskLocation Location = Entry.Location;
+ // We don't need to hold the index lock when we read a standalone file
_.ReleaseNow();
-
- if (Location.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- return GetStandaloneCacheValue(Location, HashKey, OutValue);
- }
- return GetInlineCacheValue(Location, OutValue);
+ return GetStandaloneCacheValue(Location, HashKey, OutValue);
}
-
- return false;
+ return GetInlineCacheValue(Location, OutValue);
}
void
@@ -1470,14 +1467,13 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
WriteBlockTimeUs += ElapsedUs;
WriteBlockLongestTimeUs = std::max(ElapsedUs, WriteBlockLongestTimeUs);
});
+ if (m_Index.empty())
{
- if (m_Index.empty())
- {
- ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
- return;
- }
- BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
+ ZEN_INFO("garbage collect SKIPPED, for '{}', container is empty", m_BucketDir / m_BucketName);
+ return;
}
+ BlockStoreState = m_BlockStore.GetReclaimSnapshotState();
+
SaveManifest();
Index = m_Index;
@@ -1515,6 +1511,7 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
Path.Reset();
BuildPath(Path, Key);
+ fs::path FilePath = Path.ToPath();
{
RwLock::SharedLockScope __(m_IndexLock);
@@ -1530,8 +1527,14 @@ ZenCacheDiskLayer::CacheBucket::CollectGarbage(GcContext& GcCtx)
ZEN_DEBUG("skipping z$ delete standalone of file '{}' FAILED, it has been added back", Path.ToUtf8());
continue;
}
- ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
- fs::remove(Path.c_str(), Ec);
+ __.ReleaseNow();
+
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(Key));
+ if (fs::is_regular_file(FilePath))
+ {
+ ZEN_DEBUG("deleting standalone cache file '{}'", Path.ToUtf8());
+ fs::remove(FilePath, Ec);
+ }
}
if (Ec)
@@ -1722,100 +1725,101 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac
void
ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value)
{
- RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
-
- ExtendablePathBuilder<256> DataFilePath;
- BuildPath(DataFilePath, HashKey);
-
TemporaryFile DataFile;
std::error_code Ec;
DataFile.CreateTemporary(m_BucketDir.c_str(), Ec);
-
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to open temporary file for put at '{}'", m_BucketDir));
+ throw std::system_error(Ec, fmt::format("Failed to open temporary file for put in '{}'", m_BucketDir));
}
DataFile.WriteAll(Value.Value, Ec);
-
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to write payload ({} bytes) to file", NiceBytes(Value.Value.Size())));
+ throw std::system_error(Ec,
+ fmt::format("Failed to write payload ({} bytes) to temporary file '{}' for put in '{}'",
+ NiceBytes(Value.Value.Size()),
+ DataFile.GetPath().string(),
+ m_BucketDir));
}
- // Move file into place (atomically)
-
+ ExtendablePathBuilder<256> DataFilePath;
+ BuildPath(DataFilePath, HashKey);
std::filesystem::path FsPath{DataFilePath.ToPath()};
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
-
- if (Ec)
+ // We retry to open the file since it can be held open for read.
+ // This happens if the server processes a Get request for the file or
+ // if we are busy sending the file upstream
+ int RetryCount = 3;
+ do
{
- int RetryCount = 3;
-
- do
+ Ec.clear();
{
- std::filesystem::path ParentPath = FsPath.parent_path();
- CreateDirectories(ParentPath);
-
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ }
- if (!Ec)
+ if (!Ec)
+ {
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
+
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
{
- break;
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
}
- std::error_code InnerEc;
- const uint64_t ExistingFileSize = std::filesystem::file_size(FsPath, InnerEc);
+ DiskLocation Loc(Value.Value.Size(), EntryFlags);
+ IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
- if (!InnerEc && ExistingFileSize == Value.Value.Size())
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
{
- // Concurrent write of same value?
- return;
+ // Previously unknown object
+ m_Index.insert({HashKey, Entry});
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ It.value() = Entry;
}
- // Semi arbitrary back-off
- zen::Sleep(1000 * RetryCount);
- } while (RetryCount--);
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ return;
+ }
- if (Ec)
+ std::filesystem::path ParentPath = FsPath.parent_path();
+ if (!std::filesystem::is_directory(ParentPath))
{
- throw std::system_error(Ec, fmt::format("Failed to finalize file '{}'", DataFilePath.ToUtf8()));
+ Ec.clear();
+ std::filesystem::create_directories(ParentPath, Ec);
+ if (!Ec)
+ {
+ // Retry without sleep
+ continue;
+ }
+ throw std::system_error(
+ Ec,
+ fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir));
}
- }
- // Update index
+ ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'",
+ DataFile.GetPath().string(),
+ FsPath.string(),
+ m_BucketDir,
+ Ec.message());
- uint8_t EntryFlags = DiskLocation::kStandaloneFile;
+ // Semi arbitrary back-off
+ zen::Sleep(200 * (4 - RetryCount)); // Sleep at most for a total of 2 seconds
+ RetryCount--;
+ } while (RetryCount > 0);
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- RwLock::ExclusiveLockScope _(m_IndexLock);
-
- DiskLocation Loc(Value.Value.Size(), EntryFlags);
- IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
-
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- m_Index.insert({HashKey, Entry});
- }
- else
- {
- // TODO: should check if write is idempotent and bail out if it is?
- It.value() = Entry;
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- m_TotalSize.fetch_add(Loc.Size(), std::memory_order::relaxed);
+ throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir));
}
void
@@ -1832,12 +1836,11 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
EntryFlags |= DiskLocation::kCompressed;
}
- m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](BlockStoreLocation BlockStoreLocation) {
+ m_BlockStore.WriteChunk(Value.Value.Data(), Value.Value.Size(), m_PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) {
DiskLocation Location(BlockStoreLocation, m_PayloadAlignment, EntryFlags);
const DiskIndexEntry DiskIndexEntry{.Key = HashKey, .Location = Location};
m_SlogFile.Append(DiskIndexEntry);
- m_TotalSize.fetch_add(BlockStoreLocation.Size, std::memory_order::relaxed);
- RwLock::ExclusiveLockScope __(m_IndexLock);
+ RwLock::ExclusiveLockScope _(m_IndexLock);
if (auto It = m_Index.find(HashKey); It != m_Index.end())
{
// TODO: should check if write is idempotent and bail out if it is?
@@ -1852,6 +1855,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
m_Index.insert({HashKey, {Location, GcClock::TickCount()}});
}
});
+ m_TotalSize.fetch_add(Value.Value.Size(), std::memory_order::relaxed);
}
//////////////////////////////////////////////////////////////////////////