aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-06-03 10:08:22 +0200
committerStefan Boberg <[email protected]>2022-06-03 10:08:22 +0200
commit91e2573a1fdebc1c3cbbbc5d5f9be3b6f540463b (patch)
tree2bf98fe4a1dfa20bace298d0f51b1a2d8b9a7217
parentMerge branch 'main' into use-catch2 (diff)
parentmove release job to in-house linux agent (diff)
downloadzen-91e2573a1fdebc1c3cbbbc5d5f9be3b6f540463b.tar.xz
zen-91e2573a1fdebc1c3cbbbc5d5f9be3b6f540463b.zip
merge from main
-rw-r--r--.github/workflows/create_release.yml147
-rw-r--r--.github/workflows/update_release.yml134
-rw-r--r--.github/workflows/validate.yml (renamed from .github/workflows/self_host_build.yml)82
-rw-r--r--CHANGELOG.md24
-rw-r--r--README.md18
-rw-r--r--xmake.lua2
-rw-r--r--zencore-test/zencore-test.cpp2
-rw-r--r--zencore/include/zencore/iobuffer.h1
-rw-r--r--zencore/iobuffer.cpp30
-rw-r--r--zenserver-test/zenserver-test.cpp11
-rw-r--r--zenserver/cache/structuredcache.cpp18
-rw-r--r--zenserver/cache/structuredcachestore.cpp704
-rw-r--r--zenserver/cache/structuredcachestore.h81
-rw-r--r--zenserver/config.cpp12
-rw-r--r--zenserver/config.h9
-rw-r--r--zenserver/upstream/hordecompute.cpp163
-rw-r--r--zenserver/upstream/zen.cpp75
-rw-r--r--zenserver/zenserver.cpp7
-rw-r--r--zenstore-test/zenstore-test.cpp2
-rw-r--r--zenstore/blockstore.cpp42
-rw-r--r--zenstore/compactcas.cpp6
-rw-r--r--zenstore/gc.cpp3
-rw-r--r--zenstore/include/zenstore/blockstore.h2
23 files changed, 1027 insertions, 548 deletions
diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml
new file mode 100644
index 000000000..fba2ec1b1
--- /dev/null
+++ b/.github/workflows/create_release.yml
@@ -0,0 +1,147 @@
+name: Create Release
+
+on:
+ push:
+ # Sequence of patterns matched against refs/tags
+ tags:
+ - 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10
+
+jobs:
+ bundle-windows:
+
+ runs-on: [self-hosted, windows, x64]
+
+ env:
+ VCPKG_VERSION: 2022.03.10
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Setup xmake
+ uses: xmake-io/github-action-setup-xmake@v1
+ with:
+ xmake-version: 2.6.4
+
+ - name: Installing vcpkg
+ run: |
+ git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg
+ cd .vcpkg
+ .\bootstrap-vcpkg.bat
+ .\vcpkg.exe integrate install
+ cd ..
+
+ - name: Cache vcpkg
+ uses: actions/cache@v2
+ with:
+ path: |
+ ${{ github.workspace }}\.vcpkg\installed
+ key: ${{ runner.os }}-release-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-x64-v5
+
+ - name: Bundle
+ run: |
+ xmake bundle -v -y
+ env:
+ VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
+
+ - name: Upload zenserver-win64
+ uses: actions/upload-artifact@v3
+ with:
+ name: zenserver-win64
+ path: build/zenserver-win64.zip
+
+ bundle-linux:
+ runs-on: [self-hosted, linux, x64]
+
+ env:
+ VCPKG_VERSION: 2022.03.10
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Set up GCC 11
+ uses: egor-tensin/setup-gcc@v1
+ with:
+ version: 11
+ platform: x64
+
+ - name: Setup xmake
+ uses: xmake-io/github-action-setup-xmake@v1
+ with:
+ xmake-version: 2.6.4
+
+ - name: Installing vcpkg
+ run: |
+ git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg
+ cd .vcpkg
+ ./bootstrap-vcpkg.sh
+ cd ..
+
+ - name: Cache vcpkg
+ uses: actions/cache@v2
+ with:
+ path: |
+ ${{ github.workspace }}/.vcpkg/installed
+ key: ${{ runner.os }}-release-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-x64-v5
+
+ - name: Bundle
+ run: |
+ xmake bundle -v -y
+ env:
+ VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
+
+ - name: Upload zenserver-linux
+ uses: actions/upload-artifact@v3
+ with:
+ name: zenserver-linux
+ path: build/zenserver-linux.zip
+
+ create-release:
+ runs-on: [self-hosted, linux, x64]
+ needs: [bundle-linux, bundle-windows]
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Download Linux artifacts
+ uses: actions/download-artifact@v1
+ with:
+ name: zenserver-linux
+ path: linux
+
+ - name: Download Windows artifacts
+ uses: actions/download-artifact@v1
+ with:
+ name: zenserver-win64
+ path: win64
+
+ - name: Check prerelease
+ id: get-prerelease
+ uses: haya14busa/action-cond@v1
+ with:
+ cond: ${{contains(github.ref, '-pre')}}
+ if_true: "true"
+ if_false: "false"
+
+ - name: Extract Version Changes
+ run: |
+ sed '1,/^##/!d;/##/d' CHANGELOG.md > CHANGELOG.tmp
+
+ - name: Read CHANGELOG.tmp
+ id: read_changelog
+ uses: andstor/file-reader-action@v1
+ with:
+ path: "CHANGELOG.tmp"
+
+ - name: Create Release
+ id: create_release
+ uses: softprops/action-gh-release@v1
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ with:
+ tag_name: ${{github.ref.name}}
+ body: |
+ ${{steps.read_changelog.outputs.contents}}
+ draft: false
+ prerelease: ${{steps.get-prerelease.outputs.value}}
+ files: |
+ linux/zenserver-linux.zip
+ win64/zenserver-win64.zip
diff --git a/.github/workflows/update_release.yml b/.github/workflows/update_release.yml
deleted file mode 100644
index 27d5e2783..000000000
--- a/.github/workflows/update_release.yml
+++ /dev/null
@@ -1,134 +0,0 @@
-name: Build release
-
-on:
- # push
- pull_request:
- types: [closed]
- branches: [ main ]
-
-jobs:
- windows-build:
- if: >-
- github.event.pull_request.merged == true &&
- contains( github.event.pull_request.labels.*.name, 'release')
- name: Build Windows
- runs-on: [self-hosted, windows, x64]
- strategy:
- matrix:
- config:
- - 'release'
- arch:
- - 'x64'
- env:
- VCPKG_VERSION: 2022.03.10
-
- steps:
- - uses: actions/checkout@v2
-
- - name: Setup xmake
- uses: xmake-io/github-action-setup-xmake@v1
- with:
- xmake-version: 2.6.4
-
- - name: Installing vcpkg
- run: |
- git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg
- cd .vcpkg
- .\bootstrap-vcpkg.bat
- .\vcpkg.exe integrate install
- cd ..
-
- - name: Cache vcpkg
- uses: actions/cache@v2
- with:
- path: |
- ${{ github.workspace }}\.vcpkg\installed
- key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5
-
- - name: Config
- run: |
- xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }}
- env:
- VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
-
- - name: Build
- run: |
- xmake build -v -y
- env:
- VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
-
- # - name: Create Archive
- # run: |
- # cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }}
- # C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\windows-${{ matrix.arch }}-${{ matrix.config }}.zip *
- # cd ..\..\..\..
-
- - name: Create Archive
- run: |
- cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }}
- C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\zenserver-win64.zip zenserver.exe
- cd ..\..\..\..
-
- - name: Get current release version info
- run: |
- $repo = "EpicGames/zen"
- $releases = "https://api.github.com/repos/$repo/releases/latest"
- Write-Host Determining latest release
- $latest = (Invoke-WebRequest -Headers @{"Accept"="application/vnd.github.v3+json";"Authorization"="token ${{ secrets.GITHUB_TOKEN }}"} $releases | ConvertFrom-Json)[0]
- $current_version_tag = [version]$latest.tag_name.replace('v','')
- echo "Current version" $current_version_tag
- if ($current_version_tag.Revision.Equals(9)) {
- if ($current_version_tag.Build.Equals(9)) {
- $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor+1,0,0).toString()
- }else {
- $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor,$current_version_tag.Build+1,0).toString()
- }
- }else {
- $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor,$current_version_tag.Build,$current_version_tag.Revision+1).toString()
- }
- echo $new_version_tag
- echo "new_version_tag=$new_version_tag" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append
-
- - name: Create Release
- id: create_release
- uses: actions/create-release@v1
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- with:
- tag_name: v${{ env.new_version_tag }}
- release_name: Release
- draft: false
- prerelease: false
-
- # - name: Create Release
- # id: create_release
- # uses: actions/create-release@v1
- # env:
- # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- # with:
- # tag_name: ${{ github.ref_name }}
- # release_name: Release ${{ github.head_ref }}
- # draft: false
- # prerelease: false
-
- # - name: Upload Release Asset
- # id: upload-release-asset
- # uses: actions/upload-release-asset@v1
- # env:
- # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- # with:
- # upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps
- # asset_path: .\windows-${{ matrix.arch }}-${{ matrix.config }}.zip
- # asset_name: windows-${{ matrix.arch }}-${{ matrix.config }}
- # asset_content_type: application/zip
- - name: Upload Release Asset
- id: upload-release-asset
- uses: actions/upload-release-asset@v1
- env:
- GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- with:
- upload_url: ${{ steps.create_release.outputs.upload_url }}
- asset_path: .\zenserver-win64.zip
- asset_name: zenserver-win64.zip
- asset_content_type: application/zip
-
diff --git a/.github/workflows/self_host_build.yml b/.github/workflows/validate.yml
index 3864151ce..3988a9dfb 100644
--- a/.github/workflows/self_host_build.yml
+++ b/.github/workflows/validate.yml
@@ -1,39 +1,44 @@
-name: Validate Build
+name: Validate
on:
pull_request:
- types: [opened, reopened, synchronize, reopened]
+ types: [opened, reopened, synchronize]
branches: [ main ]
+ push:
+ branches: [ main ]
jobs:
+ cancel-old-build:
+ name: Cancel previous builds
+ runs-on: [self-hosted, linux, x64]
+
+ steps:
+ - name: Cancel Previous Runs
+ if: ${{ github.ref_name != 'main'}}
+ uses: styfle/[email protected]
+ with:
+ access_token: ${{ github.token }}
+
clang-format:
+ needs: cancel-old-build
name: Check clang-format
runs-on: [self-hosted, linux, x64]
- strategy:
- matrix:
- path:
- - 'zen'
- - 'zencore'
- - 'zencore-test'
- - 'zenhttp'
- - 'zenserver-test'
- - 'zenstore'
- - 'zenstore-test'
- - 'zentest-appstub'
- - 'zenutil'
- - 'zenserver'
+
steps:
- uses: actions/checkout@v2
- - name: clang-format ${{ matrix.path }}
+
+ - name: clang-format
uses: jidicula/[email protected]
with:
clang-format-version: '13'
- check-path: ${{ matrix.path }}
+ check-path: '.'
+ exclude-regex: (.*thirdparty.*)
windows-build:
- name: Build Windows
- needs: clang-format
+ needs: cancel-old-build
+ name: Build & Test Windows
runs-on: [self-hosted, windows, x64]
+ timeout-minutes: 10
strategy:
matrix:
config:
@@ -45,7 +50,8 @@ jobs:
VCPKG_VERSION: 2022.03.10
steps:
- - uses: actions/checkout@v2
+ - name: Checkout
+ uses: actions/checkout@v2
- name: Setup xmake
uses: xmake-io/github-action-setup-xmake@v1
@@ -67,6 +73,20 @@ jobs:
${{ github.workspace }}\.vcpkg\installed
key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5
+ - name: Bundle
+ if: ${{ github.ref_name == 'main' && matrix.config == 'release' }}
+ run: |
+ xmake bundle -v -y
+ env:
+ VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
+
+ - name: Upload zenserver-win64
+ if: ${{ github.ref_name == 'main' && matrix.config == 'release' }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: zenserver-win64
+ path: build/zenserver-win64.zip
+
- name: Config
run: |
xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }}
@@ -80,9 +100,10 @@ jobs:
VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
linux-build:
- name: Build Linux
- needs: clang-format
+ needs: cancel-old-build
+ name: Build & Test Linux
runs-on: [self-hosted, linux, x64]
+ timeout-minutes: 10
strategy:
matrix:
config:
@@ -94,7 +115,8 @@ jobs:
VCPKG_VERSION: 2022.03.10
steps:
- - uses: actions/checkout@v2
+ - name: Checkout
+ uses: actions/checkout@v2
- name: Set up GCC 11
uses: egor-tensin/setup-gcc@v1
@@ -121,6 +143,20 @@ jobs:
${{ github.workspace }}/.vcpkg/installed
key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5
+ - name: Bundle
+ if: ${{ github.ref_name == 'main' && matrix.config == 'release' }}
+ run: |
+ xmake bundle -v -y
+ env:
+ VCPKG_ROOT: ${{ github.workspace }}/.vcpkg
+
+ - name: Upload zenserver-linux
+ if: ${{ github.ref_name == 'main' && matrix.config == 'release' }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: zenserver-linux
+ path: build/zenserver-linux.zip
+
- name: Config
run: |
xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }}
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 000000000..4d8fae2ed
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,24 @@
+##
+- BlockStore (small object store) Always block GC of current write block
+- Make it possible to configure GC monitoring interval using `--gc-monitor-interval-seconds`
+- Keep "reason" from upstream response so we can present it even if the request fails without outright error
+- New GitHub Actions release flow - Add release flow in GitHub actions on pushed tag `v0.1.2` gives full release, `v0.1.2-pre0` gives pre-release
+
+## 0d08450
+- Fixes issue with broken Zen instances for legacy requests
+
+## 63f50b5
+- Enable FILE_SHARE_DELETE on standalone files in disk buckets - fixes Jira UE-154234
+- Make sure we can properly create the block file before assigning it for use - fixes Jira UE-154438
+- Horde execute compressed input blobs
+- Drop namespace support
+- Safer delete of cache buckets
+
+## dba8b36
+- Namespaces: This introduces namespaces to the zenserver but only the default ue4.ddc is supported. Clients that don't send a namespace in the request will keep old behviour, new clients that sends namespace is required to use ue4.ddc (which they currently do)
+- Aligned bucket naming rules with UE code base
+- Fix retry counter and add an extra iteration to give more time for success during contention for standalone files in cache
+- Make sure CacheBucket::PutStandaloneCacheValue cleans up the temp file
+- Restore logic where we accept failed overwrite if resulting size is the same for standlone file in cache
+- Correctly calculate the m_TotalSize difference when overwriting file for standalone files in cache
+- Fix namespace folder scanning
diff --git a/README.md b/README.md
index f8cc6779a..0c418324f 100644
--- a/README.md
+++ b/README.md
@@ -194,12 +194,7 @@ xmake build
# Contributing Code
-To run the pre-commit scripts you'll need a few things:
-
-* We rely on clang-format for consistent code formatting. You can install version 12 or later from https://llvm.org/builds/
-* The helper scripts also depend on Python 3.x, which you may install from https://www.python.org/downloads/windows/ (I am presently using 3.9.5 which works). NOTE: *do* check the option to add Python to your PATH!
-
-Once you have those dependencies, you can simply run `prepare_commit.bat` to ensure the code is properly formatted and has the Epic copyright header comment. I'm sure there's a better way to integrate this into the git submit flow but my git-fu is not strong enough yet to know how to do that best.
+See [CODING.md](CODING.md)
# Debugging
@@ -212,7 +207,7 @@ is incredibly handy. When that is installed you may enable auto-attach to child
* `zencore-test` exercises unit tests in the zencore project
* `zenserver-test` exercises the zen server itself (functional tests)
-The tests are implemented using [doctest](https://github.com/onqtam/doctest), which is similar to Catch in usage.
+The tests are implemented using [doctest](https://github.com/onqtam/doctest), which is similar to Catch in usage. We now also support [catch2](https://github.com/catchorg/Catch2)
# Adding a http.sys URL reservation (Windows only)
@@ -223,12 +218,3 @@ Registering a handler for an HTTP endpoint requires either process elevation (i.
or
`netsh http add urlacl url=http://*:1337/ sddl=D:(A;;GX;;;S-1-1-0)` (enable for any authenticated user)
-
-# Coding Standards
-
-See [CODING.md](CODING.md)
-
-Run `prepare_commit.bat` before committing code. It ensures all source files are formatted with
-clang-format which you will need to install.
-
-(More helpful instructions needed here :)
diff --git a/xmake.lua b/xmake.lua
index e4d795fcf..9ffc94c65 100644
--- a/xmake.lua
+++ b/xmake.lua
@@ -1,6 +1,6 @@
-- Copyright Epic Games, Inc. All Rights Reserved.
-set_version("0.1.0", { build = "%Y%m%d%H%M" })
+set_version("0.1.1", { build = "%Y%m%d%H%M" })
set_configvar("ZEN_SCHEMA_VERSION", 3) -- changed cas oplog format (p3rl)
add_requires(
diff --git a/zencore-test/zencore-test.cpp b/zencore-test/zencore-test.cpp
index 327f2f0b5..53413fb25 100644
--- a/zencore-test/zencore-test.cpp
+++ b/zencore-test/zencore-test.cpp
@@ -19,7 +19,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
zen::logging::InitializeLogging();
- return ZEN_RUN_TESTS(argc, argv);
+ return ZEN_RUN_TESTS(argc, argv);
#else
return 0;
#endif
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index 5d9daa1c7..bf658922d 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -403,6 +403,7 @@ class IoBufferBuilder
{
public:
ZENCORE_API static IoBuffer MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset = 0, uint64_t Size = ~0ull);
+ ZENCORE_API static IoBuffer MakeFromFileWithSharedDelete(const std::filesystem::path& FileName);
ZENCORE_API static IoBuffer MakeFromTemporaryFile(const std::filesystem::path& FileName);
ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull);
ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer);
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index c4b7f7bdf..56b05d86d 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -469,15 +469,20 @@ IoBufferBuilder::MakeFromFileHandle(void* FileHandle, uint64_t Offset, uint64_t
return IoBuffer(IoBuffer::BorrowedFile, FileHandle, Offset, Size);
}
-IoBuffer
-IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size)
+static IoBuffer
+MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size, bool UseShareDelete)
{
uint64_t FileSize;
#if ZEN_PLATFORM_WINDOWS
CAtlFile DataFile;
- HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+ DWORD ShareOptions = FILE_SHARE_READ;
+ if (UseShareDelete)
+ {
+ ShareOptions |= FILE_SHARE_DELETE;
+ }
+ HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, FILE_SHARE_READ | ShareOptions, OPEN_EXISTING);
if (FAILED(hRes))
{
@@ -486,7 +491,12 @@ IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Of
DataFile.GetSize((ULONGLONG&)FileSize);
#else
- int Fd = open(FileName.c_str(), O_RDONLY);
+ int Flags = O_RDONLY;
+ if (UseShareDelete)
+ {
+ Flags |= O_CLOEXEC;
+ }
+ int Fd = open(FileName.c_str(), Flags);
if (Fd < 0)
{
return {};
@@ -530,6 +540,18 @@ IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Of
}
IoBuffer
+IoBufferBuilder::MakeFromFileWithSharedDelete(const std::filesystem::path& FileName)
+{
+ return MakeFromFileWithOptions(FileName, 0, ~0ull, true);
+}
+
+IoBuffer
+IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size)
+{
+ return MakeFromFileWithOptions(FileName, Offset, Size, false);
+}
+
+IoBuffer
IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName)
{
uint64_t FileSize;
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 452a67fd0..e995bf079 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -330,8 +330,8 @@ main(int argc, char** argv)
TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir);
ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir);
-
- return ZEN_RUN_TESTS(argc, argv);
+
+ return ZEN_RUN_TESTS(argc, argv);
}
namespace zen::tests {
@@ -2733,8 +2733,13 @@ TEST_CASE("http.package")
CHECK_EQ(ResponsePackage, TestPackage);
}
-TEST_CASE("websocket.basic") // * doctest::skip(true))
+TEST_CASE("websocket.basic")
{
+ if (true)
+ {
+ return;
+ }
+
std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
const uint16_t PortNumber = 13337;
const auto MaxWaitTime = std::chrono::seconds(5);
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index e11499289..79d15a204 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -398,7 +398,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
}
void
-HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view)
+HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view Namespace)
{
switch (Request.RequestVerb())
{
@@ -412,14 +412,14 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest&
case HttpVerb::kDelete:
// Drop namespace
{
- // if (m_CacheStore.DropNamespace(Namespace))
- // {
- // return Request.WriteResponse(HttpResponseCode::OK);
- // }
- // else
- // {
- // return Request.WriteResponse(HttpResponseCode::NotFound);
- // }
+ if (m_CacheStore.DropNamespace(Namespace))
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
+ else
+ {
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
}
break;
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index da948fd72..ee0835fd3 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -205,6 +205,36 @@ namespace {
return true;
}
+ bool MoveAndDeleteDirectory(const std::filesystem::path& Dir)
+ {
+ int DropIndex = 0;
+ do
+ {
+ if (!std::filesystem::exists(Dir))
+ {
+ return false;
+ }
+
+ std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex);
+ std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName;
+ if (std::filesystem::exists(DroppedBucketPath))
+ {
+ DropIndex++;
+ continue;
+ }
+
+ std::error_code Ec;
+ std::filesystem::rename(Dir, DroppedBucketPath, Ec);
+ if (!Ec)
+ {
+ DeleteDirectories(DroppedBucketPath);
+ return true;
+ }
+ // TODO: Do we need to bail at some point?
+ zen::Sleep(100);
+ } while (true);
+ }
+
} // namespace
namespace fs = std::filesystem;
@@ -342,6 +372,13 @@ ZenCacheNamespace::DropBucket(std::string_view Bucket)
return AnyDropped;
}
+bool
+ZenCacheNamespace::Drop()
+{
+ m_MemLayer.Drop();
+ return m_DiskLayer.Drop();
+}
+
void
ZenCacheNamespace::Flush()
{
@@ -404,14 +441,14 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
{
RwLock::SharedLockScope _(m_Lock);
- auto it = m_Buckets.find(std::string(InBucket));
+ auto It = m_Buckets.find(std::string(InBucket));
- if (it == m_Buckets.end())
+ if (It == m_Buckets.end())
{
return false;
}
- CacheBucket* Bucket = &it->second;
+ CacheBucket* Bucket = It->second.get();
_.ReleaseNow();
@@ -425,14 +462,15 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
void
ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
- CacheBucket* Bucket = nullptr;
+ const auto BucketName = std::string(InBucket);
+ CacheBucket* Bucket = nullptr;
{
RwLock::SharedLockScope _(m_Lock);
if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
{
- Bucket = &It->second;
+ Bucket = It->second.get();
}
}
@@ -444,11 +482,12 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const
if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end())
{
- Bucket = &It->second;
+ Bucket = It->second.get();
}
else
{
- Bucket = &m_Buckets[std::string(InBucket)];
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>());
+ Bucket = InsertResult.first->second.get();
}
}
@@ -458,11 +497,37 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const
}
bool
-ZenCacheMemoryLayer::DropBucket(std::string_view Bucket)
+ZenCacheMemoryLayer::DropBucket(std::string_view InBucket)
{
RwLock::ExclusiveLockScope _(m_Lock);
- return !!m_Buckets.erase(std::string(Bucket));
+ auto It = m_Buckets.find(std::string(InBucket));
+
+ if (It != m_Buckets.end())
+ {
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It);
+ Bucket.Drop();
+ return true;
+ }
+ return false;
+}
+
+void
+ZenCacheMemoryLayer::Drop()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::vector<std::unique_ptr<CacheBucket>> Buckets;
+ Buckets.reserve(m_Buckets.size());
+ while (!m_Buckets.empty())
+ {
+ const auto& It = m_Buckets.begin();
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It->first);
+ Bucket.Drop();
+ }
}
void
@@ -472,7 +537,7 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
for (auto& Kv : m_Buckets)
{
- Kv.second.Scrub(Ctx);
+ Kv.second->Scrub(Ctx);
}
}
@@ -486,7 +551,7 @@ ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& Access
for (auto& Kv : m_Buckets)
{
std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first];
- Kv.second.GatherAccessTimes(Bucket);
+ Kv.second->GatherAccessTimes(Bucket);
}
}
@@ -505,7 +570,7 @@ ZenCacheMemoryLayer::TotalSize() const
for (auto& Kv : m_Buckets)
{
- TotalSize += Kv.second.TotalSize();
+ TotalSize += Kv.second->TotalSize();
}
return TotalSize;
@@ -570,9 +635,16 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed);
}
+void
+ZenCacheMemoryLayer::CacheBucket::Drop()
+{
+ RwLock::ExclusiveLockScope _(m_BucketLock);
+ m_CacheMap.clear();
+}
+
//////////////////////////////////////////////////////////////////////////
-ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName))
+ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero)
{
}
@@ -581,19 +653,6 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket()
}
bool
-ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
-{
- if (std::filesystem::exists(BucketDir))
- {
- DeleteDirectories(BucketDir);
-
- return true;
- }
-
- return false;
-}
-
-void
ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate)
{
using namespace std::literals;
@@ -611,7 +670,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
if (Manifest)
{
m_BucketId = Manifest["BucketId"].AsObjectId();
- m_IsOk = m_BucketId != Oid::Zero;
+ if (m_BucketId == Oid::Zero)
+ {
+ return false;
+ }
}
else if (AllowCreate)
{
@@ -625,7 +687,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
else
{
- return;
+ return false;
}
OpenLog(BucketDir, IsNew);
@@ -641,7 +703,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo
}
}
- m_IsOk = true;
+ return true;
}
void
@@ -1144,7 +1206,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
- if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
+ if (IoBuffer Data = IoBufferBuilder::MakeFromFileWithSharedDelete(DataFilePath.ToPath()))
{
OutValue.Value = Data;
OutValue.Value.SetContentType(Loc.GetContentType());
@@ -1158,11 +1220,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
bool
ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
- if (!m_IsOk)
- {
- return false;
- }
-
RwLock::SharedLockScope _(m_IndexLock);
auto It = m_Index.find(HashKey);
if (It == m_Index.end())
@@ -1184,11 +1241,6 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
void
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
- if (!m_IsOk)
- {
- return;
- }
-
if (Value.Value.Size() >= m_LargeObjectThreshold)
{
return PutStandaloneCacheValue(HashKey, Value);
@@ -1196,12 +1248,24 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue&
PutInlineCacheValue(HashKey, Value);
}
-void
+bool
ZenCacheDiskLayer::CacheBucket::Drop()
{
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+
+ std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks;
+ ShardLocks.reserve(256);
+ for (RwLock& Lock : m_ShardedLocks)
+ {
+ ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock));
+ }
m_BlockStore.Close();
m_SlogFile.Close();
- DeleteDirectories(m_BucketDir);
+
+ bool Deleted = MoveAndDeleteDirectory(m_BucketDir);
+
+ m_Index.clear();
+ return Deleted;
}
void
@@ -1703,7 +1767,8 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx)
for (auto& Kv : m_Buckets)
{
- Kv.second.CollectGarbage(GcCtx);
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.CollectGarbage(GcCtx);
}
}
@@ -1716,7 +1781,7 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac
{
if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end())
{
- CacheBucket& Bucket = It->second;
+ CacheBucket& Bucket = *It->second;
Bucket.UpdateAccessTimes(Kv.second);
}
}
@@ -1766,110 +1831,80 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c
BuildPath(DataFilePath, HashKey);
std::filesystem::path FsPath{DataFilePath.ToPath()};
- // We retry to move the file since it can be held open for read.
- // This happens if the server processes a Get request for the file or
- // if we are busy sending the file upstream
- int RetryCount = 4;
- do
- {
- Ec.clear();
- {
- RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
-
- DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey));
- // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
- // will be disabled as the file handle has already been closed
- CleanUpTempFile = Ec ? true : false;
-
- if (Ec)
- {
- std::error_code ExistingEc;
- uint64_t OldFileSize = std::filesystem::file_size(FsPath, ExistingEc);
- if (!ExistingEc && (OldFileSize == NewFileSize))
- {
- ZEN_INFO(
- "Failed to move temporary file '{}' to '{}' for '{}'. Target file has same size, assuming concurrent write of same "
- "value, "
- "move "
- "failed with reason '{}'",
- DataFile.GetPath(),
- FsPath.string(),
- m_BucketDir,
- Ec.message());
- return;
- }
- }
- }
+ // We do a speculative remove of the file instead of probing with a exists call and check the error code instead
+ std::filesystem::remove(FsPath, Ec);
+ if (Ec && Ec.value() != ENOENT)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to replace file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir));
+ }
- if (!Ec)
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ if (Ec)
+ {
+ std::filesystem::path ParentPath = FsPath.parent_path();
+ if (std::filesystem::is_directory(ParentPath))
{
- uint8_t EntryFlags = DiskLocation::kStandaloneFile;
-
- if (Value.Value.GetContentType() == ZenContentType::kCbObject)
- {
- EntryFlags |= DiskLocation::kStructured;
- }
- else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
- {
- EntryFlags |= DiskLocation::kCompressed;
- }
-
- DiskLocation Loc(NewFileSize, EntryFlags);
- IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
-
- uint64_t OldFileSize = 0;
- RwLock::ExclusiveLockScope _(m_IndexLock);
- if (auto It = m_Index.find(HashKey); It == m_Index.end())
- {
- // Previously unknown object
- m_Index.insert({HashKey, Entry});
- }
- else
- {
- // TODO: should check if write is idempotent and bail out if it is?
- OldFileSize = It.value().Location.Size();
- It.value() = Entry;
- }
-
- m_SlogFile.Append({.Key = HashKey, .Location = Loc});
- if (OldFileSize <= NewFileSize)
- {
- m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed);
- }
- else
- {
- m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed);
- }
- return;
+ throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir));
}
-
- std::filesystem::path ParentPath = FsPath.parent_path();
- if (!std::filesystem::is_directory(ParentPath))
+ Ec.clear();
+ std::filesystem::create_directories(ParentPath, Ec);
+ if (Ec)
{
- Ec.clear();
- std::filesystem::create_directories(ParentPath, Ec);
- if (!Ec)
- {
- // Retry without sleep
- continue;
- }
throw std::system_error(
Ec,
fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir));
}
- ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'",
- DataFile.GetPath().string(),
- FsPath.string(),
- m_BucketDir,
- Ec.message());
+ DataFile.MoveTemporaryIntoPlace(FsPath, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir));
+ }
+ }
+
+ // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file
+ // will be disabled as the file handle has already been closed
+ CleanUpTempFile = false;
- // Semi arbitrary back-off
- zen::Sleep(200 * (5 - RetryCount)); // Sleep at most for a total of 3 seconds
- } while (RetryCount-- > 0);
+ uint8_t EntryFlags = DiskLocation::kStandaloneFile;
- throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir));
+ if (Value.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ EntryFlags |= DiskLocation::kStructured;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ EntryFlags |= DiskLocation::kCompressed;
+ }
+
+ DiskLocation Loc(NewFileSize, EntryFlags);
+ IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount());
+
+ uint64_t OldFileSize = 0;
+ RwLock::ExclusiveLockScope _(m_IndexLock);
+ if (auto It = m_Index.find(HashKey); It == m_Index.end())
+ {
+ // Previously unknown object
+ m_Index.insert({HashKey, Entry});
+ }
+ else
+ {
+ // TODO: should check if write is idempotent and bail out if it is?
+ OldFileSize = It.value().Location.Size();
+ It.value() = Entry;
+ }
+
+ m_SlogFile.Append({.Key = HashKey, .Location = Loc});
+ if (OldFileSize <= NewFileSize)
+ {
+ m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed);
+ }
+ else
+ {
+ m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed);
+ }
}
void
@@ -1925,11 +1960,11 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
{
RwLock::SharedLockScope _(m_Lock);
- auto it = m_Buckets.find(BucketName);
+ auto It = m_Buckets.find(BucketName);
- if (it != m_Buckets.end())
+ if (It != m_Buckets.end())
{
- Bucket = &it->second;
+ Bucket = It->second.get();
}
}
@@ -1939,24 +1974,27 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
RwLock::ExclusiveLockScope _(m_Lock);
- if (auto it = m_Buckets.find(BucketName); it != m_Buckets.end())
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
- Bucket = &it->second;
+ Bucket = It->second.get();
}
else
{
- auto It = m_Buckets.try_emplace(BucketName, BucketName);
- Bucket = &It.first->second;
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName;
- Bucket->OpenOrCreate(BucketPath);
+ if (!Bucket->OpenOrCreate(BucketPath))
+ {
+ m_Buckets.erase(BucketName);
+ return false;
+ }
}
}
ZEN_ASSERT(Bucket != nullptr);
-
return Bucket->Get(HashKey, OutValue);
}
@@ -1969,11 +2007,11 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
{
RwLock::SharedLockScope _(m_Lock);
- auto it = m_Buckets.find(BucketName);
+ auto It = m_Buckets.find(BucketName);
- if (it != m_Buckets.end())
+ if (It != m_Buckets.end())
{
- Bucket = &it->second;
+ Bucket = It->second.get();
}
}
@@ -1983,28 +2021,29 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
RwLock::ExclusiveLockScope _(m_Lock);
- if (auto it = m_Buckets.find(BucketName); it != m_Buckets.end())
+ if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
- Bucket = &it->second;
+ Bucket = It->second.get();
}
else
{
- auto It = m_Buckets.try_emplace(BucketName, BucketName);
- Bucket = &It.first->second;
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ Bucket = InsertResult.first->second.get();
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName;
- Bucket->OpenOrCreate(BucketPath);
+ if (!Bucket->OpenOrCreate(BucketPath))
+ {
+ m_Buckets.erase(BucketName);
+ return;
+ }
}
}
ZEN_ASSERT(Bucket != nullptr);
- if (Bucket->IsOk())
- {
- Bucket->Put(HashKey, Value);
- }
+ Bucket->Put(HashKey, Value);
}
void
@@ -2023,26 +2062,20 @@ ZenCacheDiskLayer::DiscoverBuckets()
// New bucket needs to be created
if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
+ continue;
}
- else
- {
- auto InsertResult = m_Buckets.try_emplace(BucketName, BucketName);
-
- CacheBucket& Bucket = InsertResult.first->second;
- Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
+ CacheBucket& Bucket = *InsertResult.first->second;
- if (Bucket.IsOk())
- {
- ZEN_INFO("Discovered bucket '{}'", BucketName);
- }
- else
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
- m_Buckets.erase(InsertResult.first);
- }
+ m_Buckets.erase(InsertResult.first);
+ continue;
}
+ ZEN_INFO("Discovered bucket '{}'", BucketName);
}
}
@@ -2051,23 +2084,42 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
RwLock::ExclusiveLockScope _(m_Lock);
- auto it = m_Buckets.find(std::string(InBucket));
+ auto It = m_Buckets.find(std::string(InBucket));
- if (it != m_Buckets.end())
+ if (It != m_Buckets.end())
{
- CacheBucket* Bucket = &it->second;
-
- Bucket->Drop();
-
- m_Buckets.erase(it);
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It);
- return true;
+ return Bucket.Drop();
}
+ // Make sure we remove the folder even if we don't know about the bucket
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= std::string(InBucket);
+ return MoveAndDeleteDirectory(BucketPath);
+}
- return CacheBucket::Delete(BucketPath);
+bool
+ZenCacheDiskLayer::Drop()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ std::vector<std::unique_ptr<CacheBucket>> Buckets;
+ Buckets.reserve(m_Buckets.size());
+ while (!m_Buckets.empty())
+ {
+ const auto& It = m_Buckets.begin();
+ CacheBucket& Bucket = *It->second;
+ m_DroppedBuckets.push_back(std::move(It->second));
+ m_Buckets.erase(It->first);
+ if (!Bucket.Drop())
+ {
+ return false;
+ }
+ }
+ return MoveAndDeleteDirectory(m_RootDir);
}
void
@@ -2080,7 +2132,8 @@ ZenCacheDiskLayer::Flush()
Buckets.reserve(m_Buckets.size());
for (auto& Kv : m_Buckets)
{
- Buckets.push_back(&Kv.second);
+ CacheBucket* Bucket = Kv.second.get();
+ Buckets.push_back(Bucket);
}
}
@@ -2097,7 +2150,8 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
for (auto& Kv : m_Buckets)
{
- Kv.second.Scrub(Ctx);
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.Scrub(Ctx);
}
}
@@ -2108,7 +2162,8 @@ ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx)
for (auto& Kv : m_Buckets)
{
- Kv.second.GatherReferences(GcCtx);
+ CacheBucket& Bucket = *Kv.second;
+ Bucket.GatherReferences(GcCtx);
}
}
@@ -2120,7 +2175,7 @@ ZenCacheDiskLayer::TotalSize() const
for (auto& Kv : m_Buckets)
{
- TotalSize += Kv.second.TotalSize();
+ TotalSize += Kv.second->TotalSize();
}
return TotalSize;
@@ -2130,18 +2185,22 @@ ZenCacheDiskLayer::TotalSize() const
static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc";
-ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc)
+ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration)
+: GcStorage(Gc)
+, GcContributor(Gc)
+, m_Gc(Gc)
+, m_Configuration(Configuration)
{
- CreateDirectories(BasePath);
+ CreateDirectories(m_Configuration.BasePath);
DirectoryContent DirContent;
- GetDirectoryContent(BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
+ GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent);
std::vector<std::string> LegacyBuckets;
std::vector<std::string> Namespaces;
for (const std::filesystem::path& DirPath : DirContent.Directories)
{
- std::string DirName = PathToUtf8(DirPath.stem());
+ std::string DirName = PathToUtf8(DirPath.filename());
if (DirName.starts_with(NamespaceDiskPrefix))
{
Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length()));
@@ -2150,7 +2209,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor
LegacyBuckets.push_back(DirName);
}
- ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size());
+ ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), m_Configuration.BasePath, LegacyBuckets.size());
if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end())
{
@@ -2158,13 +2217,14 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor
ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName);
- std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
+ std::filesystem::path DefaultNamespaceFolder =
+ m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName);
CreateDirectories(DefaultNamespaceFolder);
// Move any non-namespace folders into the default namespace folder
for (const std::string& DirName : LegacyBuckets)
{
- std::filesystem::path LegacyFolder = BasePath / DirName;
+ std::filesystem::path LegacyFolder = m_Configuration.BasePath / DirName;
std::filesystem::path NewPath = DefaultNamespaceFolder / DirName;
std::error_code Ec;
std::filesystem::rename(LegacyFolder, NewPath, Ec);
@@ -2179,7 +2239,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor
for (const std::string& NamespaceName : Namespaces)
{
m_Namespaces[NamespaceName] =
- std::make_unique<ZenCacheNamespace>(Gc, BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName));
+ std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName));
}
}
@@ -2216,7 +2276,22 @@ ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket)
{
return Store->DropBucket(Bucket);
}
- ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}'", Namespace, Bucket);
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket);
+ return false;
+}
+
+bool
+ZenCacheStore::DropNamespace(std::string_view InNamespace)
+{
+ RwLock::SharedLockScope _(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end())
+ {
+ ZenCacheNamespace& Namespace = *It->second;
+ m_DroppedNamespaces.push_back(std::move(It->second));
+ m_Namespaces.erase(It);
+ return Namespace.Drop();
+ }
+ ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace);
return false;
}
@@ -2247,13 +2322,29 @@ ZenCacheStore::GetNamespace(std::string_view Namespace)
return It->second.get();
}
}
- return nullptr;
+ _.ReleaseNow();
+
+ if (!m_Configuration.AllowAutomaticCreationOfNamespaces)
+ {
+ return nullptr;
+ }
+
+ RwLock::ExclusiveLockScope __(m_NamespacesLock);
+ if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end())
+ {
+ return It->second.get();
+ }
+
+ auto NewNamespace = m_Namespaces.insert_or_assign(
+ std::string(Namespace),
+ std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace)));
+ return NewNamespace.first->second.get();
}
void
ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const
{
- std::vector<std::pair<std::string, ZenCacheNamespace&> > Namespaces;
+ std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces;
{
RwLock::SharedLockScope _(m_NamespacesLock);
Namespaces.reserve(m_Namespaces.size());
@@ -3048,40 +3139,210 @@ TEST_CASE("z$.namespaces")
ScopedTemporaryDirectory TempDir;
CreateDirectories(TempDir.Path());
+ IoHash Key1;
+ IoHash Key2;
{
CasGc Gc;
- ZenCacheStore Zcs(Gc, TempDir.Path() / "cache");
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false});
const auto Bucket = "teardrinker"sv;
const auto CustomNamespace = "mynamespace"sv;
// Create a cache record
- const IoHash Key = CreateKey(42);
- CbObject CacheValue = CreateCacheValue(4096);
+ Key1 = CreateKey(42);
+ CbObject CacheValue = CreateCacheValue(4096);
IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key, PutValue);
+ Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue);
ZenCacheValue GetValue;
- CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key, GetValue));
+ CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue));
+ // This should just be dropped as we don't allow creating of namespaces on the fly
+ Zcs.Put(CustomNamespace, Bucket, Key1, PutValue);
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
+ }
- // This should just be dropped for now until we decide how we add namespaces
- Zcs.Put(CustomNamespace, Bucket, Key, PutValue);
- CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue));
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+ const auto Bucket = "teardrinker"sv;
+ const auto CustomNamespace = "mynamespace"sv;
- const IoHash Key2 = CreateKey(43);
- CbObject CacheValue2 = CreateCacheValue(4096);
+ Key2 = CreateKey(43);
+ CbObject CacheValue2 = CreateCacheValue(4096);
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue2 = {.Value = Buffer2};
Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2);
+ ZenCacheValue GetValue;
CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
+ CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
+ CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue));
+ CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue));
+ }
+}
+
+TEST_CASE("z$.drop.bucket")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ IoHash Key1;
+ IoHash Key2;
+
+ auto PutValue =
+ [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Namespace, Bucket, Key, PutValue);
+ return Key;
+ };
+ auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+ const auto Bucket = "teardrinker"sv;
+ const auto Namespace = "mynamespace"sv;
+
+ Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096);
+ Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(Value1.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropBucket(Namespace, Bucket));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace, Bucket, Key1);
+ CHECK(!Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2);
+ CHECK(!Value2.Value);
+ }
+}
+
+TEST_CASE("z$.drop.namespace")
+{
+ using namespace testutils;
+
+ const auto CreateCacheValue = [](size_t Size) -> CbObject {
+ std::vector<uint8_t> Buf;
+ Buf.resize(Size);
+
+ CbObjectWriter Writer;
+ Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
+ return Writer.Save();
+ };
+
+ ScopedTemporaryDirectory TempDir;
+ CreateDirectories(TempDir.Path());
+
+ auto PutValue =
+ [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) {
+ // Create a cache record
+ IoHash Key = CreateKey(KeyIndex);
+ CbObject CacheValue = CreateCacheValue(Size);
+
+ IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer();
+ Buffer.SetContentType(ZenContentType::kCbObject);
+
+ ZenCacheValue PutValue = {.Value = Buffer};
+ Zcs.Put(Namespace, Bucket, Key, PutValue);
+ return Key;
+ };
+ auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
+ ZenCacheValue GetValue;
+ Zcs.Get(Namespace, Bucket, Key, GetValue);
+ return GetValue;
+ };
+ WorkerThreadPool Workers(1);
+ {
+ CasGc Gc;
+ ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true});
+ const auto Bucket1 = "teardrinker1"sv;
+ const auto Bucket2 = "teardrinker2"sv;
+ const auto Namespace1 = "mynamespace1"sv;
+ const auto Namespace2 = "mynamespace2"sv;
+
+ IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096);
+ IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048);
+ IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096);
+ IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048);
+
+ ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(Value1.Value);
+ ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(Value2.Value);
+ ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
+
+ std::atomic_bool WorkComplete = false;
+ Workers.ScheduleWork([&]() {
+ zen::Sleep(100);
+ Value1.Value = IoBuffer{};
+ Value2.Value = IoBuffer{};
+ Value3.Value = IoBuffer{};
+ Value4.Value = IoBuffer{};
+ WorkComplete = true;
+ });
+ // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket
+ // Our DropBucket execution blocks any incoming request from completing until we are done with the drop
+ CHECK(Zcs.DropNamespace(Namespace1));
+ while (!WorkComplete)
+ {
+ zen::Sleep(1);
+ }
+
+ // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty
+ Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1);
+ CHECK(!Value1.Value);
+ Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2);
+ CHECK(!Value2.Value);
+ Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3);
+ CHECK(Value3.Value);
+ Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4);
+ CHECK(Value4.Value);
}
}
@@ -3093,7 +3354,7 @@ TEST_CASE("z$.blocked.disklayer.put")
const auto CreateCacheValue = [](size_t Size) -> CbObject {
std::vector<uint8_t> Buf;
- Buf.resize(Size);
+ Buf.resize(Size, Size & 0xff);
CbObjectWriter Writer;
Writer.AddBinary("Binary"sv, Buf.data(), Buf.size());
@@ -3115,25 +3376,26 @@ TEST_CASE("z$.blocked.disklayer.put")
ZenCacheValue BufferGet;
CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
- // Overwriting with a value of same size should go fine
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer});
-
CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1);
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
-# if ZEN_PLATFORM_WINDOWS
- // On Windows platform, overwriting with different size while we have
- // it open for read should throw exception if file is held open
- CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}));
-# else
- // Other platforms should handle overwrite just fine
+
+ // We should be able to overwrite even if the file is open for read
Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
-# endif
- BufferGet = ZenCacheValue{};
+ MemoryView OldView = BufferGet.Value.GetView();
- // Read access has been removed, we should now be able to overwrite it
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2});
+ ZenCacheValue BufferGet2;
+ CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2));
+ MemoryView NewView = BufferGet2.Value.GetView();
+
+ // Make sure file openend for read before we wrote it still have old data
+ CHECK(OldView.GetSize() == Buffer.GetSize());
+ CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0);
+
+ // Make sure we get the new data when reading after we write new data
+ CHECK(NewView.GetSize() == Buffer2.GetSize());
+ CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0);
}
#endif
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 34b8d18f4..c226074fe 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -150,6 +150,7 @@ public:
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
+ void Drop();
bool DropBucket(std::string_view Bucket);
void Scrub(ScrubContext& Ctx);
void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes);
@@ -193,14 +194,16 @@ private:
bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ void Drop();
void Scrub(ScrubContext& Ctx);
void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
inline uint64_t TotalSize() const { return m_TotalSize; }
};
- mutable RwLock m_Lock;
- std::unordered_map<std::string, CacheBucket> m_Buckets;
- Configuration m_Configuration;
+ mutable RwLock m_Lock;
+ std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
+ std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
+ Configuration m_Configuration;
ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete;
ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete;
@@ -214,6 +217,7 @@ public:
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
+ bool Drop();
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
@@ -233,19 +237,16 @@ private:
CacheBucket(std::string BucketName);
~CacheBucket();
- void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
- static bool Delete(std::filesystem::path BucketDir);
- bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const IoHash& HashKey, const ZenCacheValue& Value);
- void Drop();
- void Flush();
- void SaveManifest();
- void Scrub(ScrubContext& Ctx);
- void GatherReferences(GcContext& GcCtx);
- void CollectGarbage(GcContext& GcCtx);
- void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
+ bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool Drop();
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ void GatherReferences(GcContext& GcCtx);
+ void CollectGarbage(GcContext& GcCtx);
+ void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes);
- inline bool IsOk() const { return m_IsOk; }
inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); }
private:
@@ -257,7 +258,6 @@ private:
std::filesystem::path m_BlocksBasePath;
BlockStore m_BlockStore;
Oid m_BucketId;
- bool m_IsOk = false;
uint64_t m_LargeObjectThreshold = 64 * 1024;
// These files are used to manage storage of small objects for this bucket
@@ -292,16 +292,18 @@ private:
std::atomic_uint64_t m_TotalSize{};
- void BuildPath(PathBuilderBase& Path, const IoHash& HashKey);
- void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
- void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
- bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
- void MakeIndexSnapshot();
- uint64_t ReadIndexFile();
- uint64_t ReadLog(uint64_t LogPosition);
- uint64_t MigrateLegacyData(bool CleanSource);
- void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew);
+ void BuildPath(PathBuilderBase& Path, const IoHash& HashKey);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+ void MakeIndexSnapshot();
+ uint64_t ReadIndexFile();
+ uint64_t ReadLog(uint64_t LogPosition);
+ uint64_t MigrateLegacyData(bool CleanSource);
+ void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew);
+ static bool Delete(std::filesystem::path BucketDir);
+ void SaveManifest();
// These locks are here to avoid contention on file creation, therefore it's sufficient
// that we take the same lock for the same hash
@@ -314,9 +316,10 @@ private:
inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
};
- std::filesystem::path m_RootDir;
- mutable RwLock m_Lock;
- std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive
+ std::filesystem::path m_RootDir;
+ mutable RwLock m_Lock;
+ std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive
+ std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
@@ -330,6 +333,7 @@ public:
bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
+ bool Drop();
bool DropBucket(std::string_view Bucket);
void Flush();
void Scrub(ScrubContext& Ctx);
@@ -360,12 +364,19 @@ public:
"!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given
static constexpr std::string_view NamespaceDiskPrefix = "ns_";
- ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath);
+ struct Configuration
+ {
+ std::filesystem::path BasePath;
+ bool AllowAutomaticCreationOfNamespaces = true;
+ };
+
+ ZenCacheStore(CasGc& Gc, const Configuration& Configuration);
~ZenCacheStore();
bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
+ bool DropNamespace(std::string_view Namespace);
void Flush();
void Scrub(ScrubContext& Ctx);
@@ -377,10 +388,14 @@ private:
ZenCacheNamespace* GetNamespace(std::string_view Namespace);
void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const;
- typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NameSpaceMap;
+ typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap;
+
+ mutable RwLock m_NamespacesLock;
+ NamespaceMap m_Namespaces;
+ std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
- mutable RwLock m_NamespacesLock;
- NameSpaceMap m_Namespaces;
+ CasGc& m_Gc;
+ Configuration m_Configuration;
};
void z$_forcelink();
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index be91ae4f8..c534865dc 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -478,6 +478,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
"Size of gc disk reserve in bytes.",
cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"),
"");
+
+ options.add_option("gc",
+ "",
+ "gc-monitor-interval-seconds",
+ "Garbage collection monitoring interval in seconds.",
+ cxxopts::value<int32_t>(ServerOptions.GcConfig.MonitorIntervalSeconds)->default_value("30"),
+ "");
try
{
auto result = options.parse(argc, argv);
@@ -770,8 +777,9 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
if (sol::optional<sol::table> GcConfig = lua["gc"])
{
- ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
- ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28));
+ ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30);
+ ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
+ ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28));
if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"])
{
diff --git a/zenserver/config.h b/zenserver/config.h
index 49f039d8d..a07bba9a4 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -96,10 +96,11 @@ struct ZenGcConfig
{
ZenCasEvictionPolicy Cas;
ZenCacheEvictionPolicy Cache;
- int32_t IntervalSeconds = 0;
- bool CollectSmallObjects = true;
- bool Enabled = true;
- uint64_t DiskReserveSize = 1ul << 28;
+ int32_t MonitorIntervalSeconds = 30;
+ int32_t IntervalSeconds = 0;
+ bool CollectSmallObjects = true;
+ bool Enabled = true;
+ uint64_t DiskReserveSize = 1ul << 28;
};
struct ZenServerOptions
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index 2ec24b303..38c798569 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -157,8 +157,13 @@ namespace detail {
{
ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks();
- std::scoped_lock Lock(m_TaskMutex);
- if (m_PendingTasks.contains(UpstreamData.TaskId))
+
+ bool AlreadyQueued;
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ AlreadyQueued = m_PendingTasks.contains(UpstreamData.TaskId);
+ }
+ if (AlreadyQueued)
{
// Pending task is already queued, return success
ApplyResult.Success = true;
@@ -171,7 +176,7 @@ namespace detail {
CloudCacheSession StorageSession(m_StorageClient);
{
- CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs);
+ CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs, UpstreamData.CasIds);
ApplyResult.Bytes += Result.Bytes;
ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks();
@@ -182,6 +187,22 @@ namespace detail {
return ApplyResult;
}
UpstreamData.Blobs.clear();
+ UpstreamData.CasIds.clear();
+ }
+
+ {
+ CloudCacheResult Result = BatchPutCompressedBlobsIfMissing(StorageSession, UpstreamData.Cids);
+ ApplyResult.Bytes += Result.Bytes;
+ ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
+ ApplyResult.Timepoints["zen-storage-upload-compressed-blobs"] = DateTime::NowTicks();
+ if (!Result.Success)
+ {
+ ApplyResult.Error = {
+ .ErrorCode = Result.ErrorCode,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload compressed blobs"};
+ return ApplyResult;
+ }
+ UpstreamData.Cids.clear();
}
{
@@ -279,9 +300,11 @@ namespace detail {
}
}
- [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs)
+ [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session,
+ const std::map<IoHash, IoBuffer>& Blobs,
+ const std::set<IoHash>& CasIds)
{
- if (Blobs.size() == 0)
+ if (Blobs.size() == 0 && CasIds.size() == 0)
{
return {.Success = true};
}
@@ -292,6 +315,7 @@ namespace detail {
// Batch check for missing blobs
std::set<IoHash> Keys;
std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
+ Keys.insert(CasIds.begin(), CasIds.end());
CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
@@ -310,7 +334,22 @@ namespace detail {
for (const auto& Hash : ExistsResult.Needs)
{
- CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash));
+ IoBuffer DataBuffer;
+ if (Blobs.contains(Hash))
+ {
+ DataBuffer = Blobs.at(Hash);
+ }
+ else
+ {
+ DataBuffer = m_CasStore.FindChunk(Hash);
+ if (!DataBuffer)
+ {
+ Log().warn("Put blob FAILED, input chunk '{}' missing", Hash);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put blobs"};
+ }
+ }
+
+ CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer);
Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
@@ -326,6 +365,62 @@ namespace detail {
return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
}
+ [[nodiscard]] CloudCacheResult BatchPutCompressedBlobsIfMissing(CloudCacheSession& Session, const std::set<IoHash>& Cids)
+ {
+ if (Cids.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing compressed blobs
+ CloudCacheExistsResult ExistsResult = Session.CompressedBlobExists(Session.Client().DefaultBlobStoreNamespace(), Cids);
+ Log().debug("Queried {} missing compressed blobs Need={} Duration={}s Result={}",
+ Cids.size(),
+ ExistsResult.Needs.size(),
+ ExistsResult.ElapsedSeconds,
+ ExistsResult.Success);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (!ExistsResult.Success)
+ {
+ return {
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
+ .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if compressed blobs exist"};
+ }
+
+ for (const auto& Hash : ExistsResult.Needs)
+ {
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Hash);
+ if (!DataBuffer)
+ {
+ Log().warn("Put compressed blob FAILED, input CID chunk '{}' missing", Hash);
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put compressed blobs"};
+ }
+
+ CloudCacheResult Result = Session.PutCompressedBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer);
+ Log().debug("Put compressed blob {} Bytes={} Duration={}s Result={}",
+ Hash,
+ Result.Bytes,
+ Result.ElapsedSeconds,
+ Result.Success);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put compressed blobs"};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
[[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects)
{
if (Objects.size() == 0)
@@ -599,6 +694,8 @@ namespace detail {
{
std::map<IoHash, IoBuffer> Blobs;
std::map<IoHash, CbObject> Objects;
+ std::set<IoHash> CasIds;
+ std::set<IoHash> Cids;
IoHash TaskId;
IoHash RequirementsId;
};
@@ -957,7 +1054,7 @@ namespace detail {
for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
{
CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds))
{
return false;
}
@@ -966,7 +1063,7 @@ namespace detail {
for (auto& It : ApplyRecord.WorkerDescriptor["files"sv])
{
CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds))
{
return false;
}
@@ -1034,11 +1131,10 @@ namespace detail {
bool AnyErrors = false;
ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Cid = Field.AsHash();
- const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
- IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
+ const IoHash Cid = Field.AsHash();
+ const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
- if (!DataBuffer)
+ if (!m_CidStore.ContainsChunk(Cid))
{
Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
AnyErrors = true;
@@ -1050,11 +1146,9 @@ namespace detail {
return;
}
- const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
-
InputFiles.insert(FilePath);
- InputFileHashes[FilePath] = CompressedId;
- Data.Blobs[CompressedId] = std::move(DataBuffer);
+ InputFileHashes[FilePath] = Cid;
+ Data.Cids.insert(Cid);
});
if (AnyErrors)
@@ -1067,7 +1161,7 @@ namespace detail {
const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
- CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects);
+ CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Cids, Data.Objects);
const IoHash SandboxHash = Sandbox.GetHash();
Data.Objects[SandboxHash] = std::move(Sandbox);
@@ -1118,28 +1212,18 @@ namespace detail {
[[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry,
std::set<std::filesystem::path>& InputFiles,
std::map<std::filesystem::path, IoHash>& InputFileHashes,
- std::map<IoHash, IoBuffer>& Blobs)
+ std::set<IoHash>& CasIds)
{
- const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
- const IoHash ChunkId = FileEntry["hash"sv].AsHash();
- const uint64_t Size = FileEntry["size"sv].AsUInt64();
- IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId);
+ const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
+ const IoHash ChunkId = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
- if (!DataBuffer)
+ if (!m_CasStore.ContainsChunk(ChunkId))
{
Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
return false;
}
- if (DataBuffer.Size() != Size)
- {
- Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}",
- ChunkId,
- DataBuffer.Size(),
- Size);
- return false;
- }
-
if (InputFiles.contains(FilePath))
{
Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath);
@@ -1148,7 +1232,7 @@ namespace detail {
InputFiles.insert(FilePath);
InputFileHashes[FilePath] = ChunkId;
- Blobs[ChunkId] = std::move(DataBuffer);
+ CasIds.insert(ChunkId);
return true;
}
@@ -1204,7 +1288,7 @@ namespace detail {
[[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory,
const std::map<std::filesystem::path, IoHash>& InputFileHashes,
- const std::map<IoHash, IoBuffer>& Blobs,
+ const std::set<IoHash>& Cids,
std::map<IoHash, CbObject>& Objects)
{
CbObjectWriter DirectoryTreeWriter;
@@ -1214,14 +1298,13 @@ namespace detail {
DirectoryTreeWriter.BeginArray("f"sv);
for (const auto& File : RootDirectory.Files)
{
- const std::filesystem::path FilePath = {RootDirectory.Path / File};
- const IoHash& FileHash = InputFileHashes.at(FilePath);
- const uint64_t FileSize = Blobs.at(FileHash).Size();
+ const std::filesystem::path FilePath = {RootDirectory.Path / File};
+ const IoHash& FileHash = InputFileHashes.at(FilePath);
+ const bool Compressed = Cids.contains(FileHash);
DirectoryTreeWriter.BeginObject();
DirectoryTreeWriter.AddString("n"sv, File);
DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash);
- DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size
- // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded
+ DirectoryTreeWriter.AddBool("c"sv, Compressed);
DirectoryTreeWriter.EndObject();
}
DirectoryTreeWriter.EndArray();
@@ -1232,7 +1315,7 @@ namespace detail {
DirectoryTreeWriter.BeginArray("d"sv);
for (const auto& Item : RootDirectory.Directories)
{
- CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects);
+ CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Cids, Objects);
const IoHash DirectoryHash = Directory.GetHash();
Objects[DirectoryHash] = std::move(Directory);
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index efc75b5b4..0237ec346 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -408,14 +408,15 @@ ZenStructuredCacheSession::CheckHealth()
}
ZenCacheResult
-ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type)
+ZenStructuredCacheSession::GetCacheRecord(std::string_view, std::string_view BucketId, const IoHash& Key, ZenContentType Type)
{
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/";
- if (Namespace != ZenCacheStore::DefaultNamespace)
- {
- Uri << Namespace << "/";
- }
+ // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support
+ // if (Namespace != ZenCacheStore::DefaultNamespace)
+ // {
+ // Uri << Namespace << "/";
+ // }
Uri << BucketId << "/" << Key.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
@@ -437,17 +438,15 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::strin
}
ZenCacheResult
-ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- const IoHash& ValueContentId)
+ZenStructuredCacheSession::GetCacheValue(std::string_view, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId)
{
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/";
- if (Namespace != ZenCacheStore::DefaultNamespace)
- {
- Uri << Namespace << "/";
- }
+ // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support
+ // if (Namespace != ZenCacheStore::DefaultNamespace)
+ // {
+ // Uri << Namespace << "/";
+ // }
Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
@@ -466,11 +465,15 @@ ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace,
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = Buffer,
+ .Bytes = Response.downloaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Reason = Response.reason,
+ .Success = Success};
}
ZenCacheResult
-ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
+ZenStructuredCacheSession::PutCacheRecord(std::string_view,
std::string_view BucketId,
const IoHash& Key,
IoBuffer Value,
@@ -478,10 +481,11 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
{
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/";
- if (Namespace != ZenCacheStore::DefaultNamespace)
- {
- Uri << Namespace << "/";
- }
+ // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support
+ // if (Namespace != ZenCacheStore::DefaultNamespace)
+ // {
+ // Uri << Namespace << "/";
+ // }
Uri << BucketId << "/" << Key.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
@@ -501,13 +505,12 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
}
- return {.Bytes = Response.uploaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
}
ZenCacheResult
-ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
+ZenStructuredCacheSession::PutCacheValue(std::string_view,
std::string_view BucketId,
const IoHash& Key,
const IoHash& ValueContentId,
@@ -515,10 +518,11 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
{
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/";
- if (Namespace != ZenCacheStore::DefaultNamespace)
- {
- Uri << Namespace << "/";
- }
+ // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support
+ // if (Namespace != ZenCacheStore::DefaultNamespace)
+ // {
+ // Uri << Namespace << "/";
+ // }
Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();
cpr::Session& Session = m_SessionState->GetSession();
@@ -535,9 +539,8 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
}
- return {.Bytes = Response.uploaded_bytes,
- .ElapsedSeconds = Response.elapsed,
- .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ const bool Success = Response.status_code == 200 || Response.status_code == 201;
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
}
ZenCacheResult
@@ -566,7 +569,11 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = std::move(Buffer),
+ .Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Reason = Response.reason,
+ .Success = Success};
}
ZenCacheResult
@@ -594,7 +601,11 @@ ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request)
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+ return {.Response = std::move(Buffer),
+ .Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Reason = Response.reason,
+ .Success = Success};
}
} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 53a046f80..4db69c265 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -364,6 +364,7 @@ public:
ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds);
zen::GcSchedulerConfig GcConfig{
.RootDirectory = m_DataRoot / "gc",
+ .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
.Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds),
.MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds),
.CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects,
@@ -754,7 +755,9 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
using namespace std::literals;
ZEN_INFO("instantiating structured cache service");
- m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache");
+ m_CacheStore = std::make_unique<ZenCacheStore>(
+ m_CasGc,
+ ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true});
const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
@@ -1161,7 +1164,7 @@ test_main(int argc, char** argv)
zen::MaximizeOpenFileCount();
- return ZEN_RUN_TESTS(argc, argv);
+ return ZEN_RUN_TESTS(argc, argv);
}
#endif
diff --git a/zenstore-test/zenstore-test.cpp b/zenstore-test/zenstore-test.cpp
index 8b3d6c648..00c1136b6 100644
--- a/zenstore-test/zenstore-test.cpp
+++ b/zenstore-test/zenstore-test.cpp
@@ -25,7 +25,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[])
zen::logging::InitializeLogging();
zen::MaximizeOpenFileCount();
- return ZEN_RUN_TESTS(argc, argv);
+ return ZEN_RUN_TESTS(argc, argv);
#else
return 0;
#endif
diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp
index d490678b5..4e61c23cf 100644
--- a/zenstore/blockstore.cpp
+++ b/zenstore/blockstore.cpp
@@ -225,23 +225,27 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, Writ
{
m_WriteBlock = nullptr;
}
+
+ if (m_ChunkBlocks.size() == m_MaxBlockCount)
{
- if (m_ChunkBlocks.size() == m_MaxBlockCount)
- {
- throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
- }
- WriteBlockIndex += IsWriting ? 1 : 0;
- while (m_ChunkBlocks.contains(WriteBlockIndex))
- {
- WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
- }
- std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
- m_WriteBlock = new BlockStoreFile(BlockPath);
- m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock;
- m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
+ throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath));
}
+
+ WriteBlockIndex += IsWriting ? 1 : 0;
+ while (m_ChunkBlocks.contains(WriteBlockIndex))
+ {
+ WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1);
+ }
+
+ std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex);
+
+ Ref<BlockStoreFile> NewBlockFile = new BlockStoreFile(BlockPath);
+ NewBlockFile->Create(m_MaxBlockSize);
+
+ m_ChunkBlocks[WriteBlockIndex] = NewBlockFile;
+ m_WriteBlock = NewBlockFile;
+ m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release);
m_CurrentInsertOffset = 0;
- m_WriteBlock->Create(m_MaxBlockSize);
}
uint64_t InsertOffset = m_CurrentInsertOffset;
m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment);
@@ -268,6 +272,10 @@ BlockStore::GetReclaimSnapshotState()
{
State.m_ActiveWriteBlocks.insert(BlockIndex);
}
+ if (m_WriteBlock)
+ {
+ State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex);
+ }
State.BlockCount = m_ChunkBlocks.size();
return State;
}
@@ -911,12 +919,6 @@ BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint
#if ZEN_WITH_TESTS
-static bool
-operator==(const BlockStoreLocation& Lhs, const BlockStoreLocation& Rhs)
-{
- return Lhs.BlockIndex == Rhs.BlockIndex && Lhs.Offset == Rhs.Offset && Lhs.Size == Rhs.Size;
-}
-
TEST_CASE("blockstore.blockstoredisklocation")
{
BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0};
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index c277359bd..65f959a0e 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -1894,8 +1894,12 @@ TEST_CASE("compactcas.threadedinsert")
}
}
-TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true))
+TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true))
{
+ if (true)
+ {
+ return;
+ }
const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas";
std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs");
std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs");
diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp
index 4b50668d9..8e2d441f8 100644
--- a/zenstore/gc.cpp
+++ b/zenstore/gc.cpp
@@ -343,7 +343,7 @@ GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc)
GcStorage::~GcStorage()
{
- m_Gc.AddGcStorage(this);
+ m_Gc.RemoveGcStorage(this);
}
//////////////////////////////////////////////////////////////////////////
@@ -373,6 +373,7 @@ CasGc::RemoveGcContributor(GcContributor* Contributor)
void
CasGc::AddGcStorage(GcStorage* Storage)
{
+ ZEN_ASSERT(Storage != nullptr);
RwLock::ExclusiveLockScope _(m_Lock);
m_GcStorage.push_back(Storage);
}
diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h
index 34c475fb6..000395fb9 100644
--- a/zenstore/include/zenstore/blockstore.h
+++ b/zenstore/include/zenstore/blockstore.h
@@ -64,6 +64,8 @@ struct BlockStoreDiskLocation
inline uint64_t GetSize() const { return m_Size; }
+ inline auto operator<=>(const BlockStoreDiskLocation& Rhs) const = default;
+
private:
inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size)
{