diff options
| author | Stefan Boberg <[email protected]> | 2022-06-03 10:08:22 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2022-06-03 10:08:22 +0200 |
| commit | 91e2573a1fdebc1c3cbbbc5d5f9be3b6f540463b (patch) | |
| tree | 2bf98fe4a1dfa20bace298d0f51b1a2d8b9a7217 | |
| parent | Merge branch 'main' into use-catch2 (diff) | |
| parent | move release job to in-house linux agent (diff) | |
| download | zen-91e2573a1fdebc1c3cbbbc5d5f9be3b6f540463b.tar.xz zen-91e2573a1fdebc1c3cbbbc5d5f9be3b6f540463b.zip | |
merge from main
| -rw-r--r-- | .github/workflows/create_release.yml | 147 | ||||
| -rw-r--r-- | .github/workflows/update_release.yml | 134 | ||||
| -rw-r--r-- | .github/workflows/validate.yml (renamed from .github/workflows/self_host_build.yml) | 82 | ||||
| -rw-r--r-- | CHANGELOG.md | 24 | ||||
| -rw-r--r-- | README.md | 18 | ||||
| -rw-r--r-- | xmake.lua | 2 | ||||
| -rw-r--r-- | zencore-test/zencore-test.cpp | 2 | ||||
| -rw-r--r-- | zencore/include/zencore/iobuffer.h | 1 | ||||
| -rw-r--r-- | zencore/iobuffer.cpp | 30 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 11 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 18 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 704 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 81 | ||||
| -rw-r--r-- | zenserver/config.cpp | 12 | ||||
| -rw-r--r-- | zenserver/config.h | 9 | ||||
| -rw-r--r-- | zenserver/upstream/hordecompute.cpp | 163 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 75 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 7 | ||||
| -rw-r--r-- | zenstore-test/zenstore-test.cpp | 2 | ||||
| -rw-r--r-- | zenstore/blockstore.cpp | 42 | ||||
| -rw-r--r-- | zenstore/compactcas.cpp | 6 | ||||
| -rw-r--r-- | zenstore/gc.cpp | 3 | ||||
| -rw-r--r-- | zenstore/include/zenstore/blockstore.h | 2 |
23 files changed, 1027 insertions, 548 deletions
diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml new file mode 100644 index 000000000..fba2ec1b1 --- /dev/null +++ b/.github/workflows/create_release.yml @@ -0,0 +1,147 @@ +name: Create Release + +on: + push: + # Sequence of patterns matched against refs/tags + tags: + - 'v*' # Push events to matching v*, i.e. v1.0, v20.15.10 + +jobs: + bundle-windows: + + runs-on: [self-hosted, windows, x64] + + env: + VCPKG_VERSION: 2022.03.10 + + steps: + - uses: actions/checkout@v2 + + - name: Setup xmake + uses: xmake-io/github-action-setup-xmake@v1 + with: + xmake-version: 2.6.4 + + - name: Installing vcpkg + run: | + git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg + cd .vcpkg + .\bootstrap-vcpkg.bat + .\vcpkg.exe integrate install + cd .. + + - name: Cache vcpkg + uses: actions/cache@v2 + with: + path: | + ${{ github.workspace }}\.vcpkg\installed + key: ${{ runner.os }}-release-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-x64-v5 + + - name: Bundle + run: | + xmake bundle -v -y + env: + VCPKG_ROOT: ${{ github.workspace }}/.vcpkg + + - name: Upload zenserver-win64 + uses: actions/upload-artifact@v3 + with: + name: zenserver-win64 + path: build/zenserver-win64.zip + + bundle-linux: + runs-on: [self-hosted, linux, x64] + + env: + VCPKG_VERSION: 2022.03.10 + + steps: + - uses: actions/checkout@v2 + + - name: Set up GCC 11 + uses: egor-tensin/setup-gcc@v1 + with: + version: 11 + platform: x64 + + - name: Setup xmake + uses: xmake-io/github-action-setup-xmake@v1 + with: + xmake-version: 2.6.4 + + - name: Installing vcpkg + run: | + git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg + cd .vcpkg + ./bootstrap-vcpkg.sh + cd .. + + - name: Cache vcpkg + uses: actions/cache@v2 + with: + path: | + ${{ github.workspace }}/.vcpkg/installed + key: ${{ runner.os }}-release-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-x64-v5 + + - name: Bundle + run: | + xmake bundle -v -y + env: + VCPKG_ROOT: ${{ github.workspace }}/.vcpkg + + - name: Upload zenserver-linux + uses: actions/upload-artifact@v3 + with: + name: zenserver-linux + path: build/zenserver-linux.zip + + create-release: + runs-on: [self-hosted, linux, x64] + needs: [bundle-linux, bundle-windows] + steps: + - uses: actions/checkout@v2 + + - name: Download Linux artifacts + uses: actions/download-artifact@v1 + with: + name: zenserver-linux + path: linux + + - name: Download Windows artifacts + uses: actions/download-artifact@v1 + with: + name: zenserver-win64 + path: win64 + + - name: Check prerelease + id: get-prerelease + uses: haya14busa/action-cond@v1 + with: + cond: ${{contains(github.ref, '-pre')}} + if_true: "true" + if_false: "false" + + - name: Extract Version Changes + run: | + sed '1,/^##/!d;/##/d' CHANGELOG.md > CHANGELOG.tmp + + - name: Read CHANGELOG.tmp + id: read_changelog + uses: andstor/file-reader-action@v1 + with: + path: "CHANGELOG.tmp" + + - name: Create Release + id: create_release + uses: softprops/action-gh-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{github.ref.name}} + body: | + ${{steps.read_changelog.outputs.contents}} + draft: false + prerelease: ${{steps.get-prerelease.outputs.value}} + files: | + linux/zenserver-linux.zip + win64/zenserver-win64.zip diff --git a/.github/workflows/update_release.yml b/.github/workflows/update_release.yml deleted file mode 100644 index 27d5e2783..000000000 --- a/.github/workflows/update_release.yml +++ /dev/null @@ -1,134 +0,0 @@ -name: Build release - -on: - # push - pull_request: - types: [closed] - branches: [ main ] - -jobs: - windows-build: - if: >- - github.event.pull_request.merged == true && - contains( github.event.pull_request.labels.*.name, 'release') - name: Build Windows - runs-on: [self-hosted, windows, x64] - strategy: - matrix: - config: - - 'release' - arch: - - 'x64' - env: - VCPKG_VERSION: 2022.03.10 - - steps: - - uses: actions/checkout@v2 - - - name: Setup xmake - uses: xmake-io/github-action-setup-xmake@v1 - with: - xmake-version: 2.6.4 - - - name: Installing vcpkg - run: | - git clone -b ${{env.VCPKG_VERSION}} --single-branch https://github.com/Microsoft/vcpkg.git .vcpkg - cd .vcpkg - .\bootstrap-vcpkg.bat - .\vcpkg.exe integrate install - cd .. - - - name: Cache vcpkg - uses: actions/cache@v2 - with: - path: | - ${{ github.workspace }}\.vcpkg\installed - key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5 - - - name: Config - run: | - xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }} - env: - VCPKG_ROOT: ${{ github.workspace }}/.vcpkg - - - name: Build - run: | - xmake build -v -y - env: - VCPKG_ROOT: ${{ github.workspace }}/.vcpkg - - # - name: Create Archive - # run: | - # cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }} - # C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\windows-${{ matrix.arch }}-${{ matrix.config }}.zip * - # cd ..\..\..\.. - - - name: Create Archive - run: | - cd .\build\windows\${{ matrix.arch }}\${{ matrix.config }} - C:\'Program Files'\7-Zip\7z.exe a -r ..\..\..\..\zenserver-win64.zip zenserver.exe - cd ..\..\..\.. - - - name: Get current release version info - run: | - $repo = "EpicGames/zen" - $releases = "https://api.github.com/repos/$repo/releases/latest" - Write-Host Determining latest release - $latest = (Invoke-WebRequest -Headers @{"Accept"="application/vnd.github.v3+json";"Authorization"="token ${{ secrets.GITHUB_TOKEN }}"} $releases | ConvertFrom-Json)[0] - $current_version_tag = [version]$latest.tag_name.replace('v','') - echo "Current version" $current_version_tag - if ($current_version_tag.Revision.Equals(9)) { - if ($current_version_tag.Build.Equals(9)) { - $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor+1,0,0).toString() - }else { - $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor,$current_version_tag.Build+1,0).toString() - } - }else { - $new_version_tag = [version]::New($current_version_tag.Major,$current_version_tag.Minor,$current_version_tag.Build,$current_version_tag.Revision+1).toString() - } - echo $new_version_tag - echo "new_version_tag=$new_version_tag" | Out-File -FilePath $env:GITHUB_ENV -Encoding utf8 -Append - - - name: Create Release - id: create_release - uses: actions/create-release@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - tag_name: v${{ env.new_version_tag }} - release_name: Release - draft: false - prerelease: false - - # - name: Create Release - # id: create_release - # uses: actions/create-release@v1 - # env: - # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - # with: - # tag_name: ${{ github.ref_name }} - # release_name: Release ${{ github.head_ref }} - # draft: false - # prerelease: false - - # - name: Upload Release Asset - # id: upload-release-asset - # uses: actions/upload-release-asset@v1 - # env: - # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - # with: - # upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps - # asset_path: .\windows-${{ matrix.arch }}-${{ matrix.config }}.zip - # asset_name: windows-${{ matrix.arch }}-${{ matrix.config }} - # asset_content_type: application/zip - - name: Upload Release Asset - id: upload-release-asset - uses: actions/upload-release-asset@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - upload_url: ${{ steps.create_release.outputs.upload_url }} - asset_path: .\zenserver-win64.zip - asset_name: zenserver-win64.zip - asset_content_type: application/zip - diff --git a/.github/workflows/self_host_build.yml b/.github/workflows/validate.yml index 3864151ce..3988a9dfb 100644 --- a/.github/workflows/self_host_build.yml +++ b/.github/workflows/validate.yml @@ -1,39 +1,44 @@ -name: Validate Build +name: Validate on: pull_request: - types: [opened, reopened, synchronize, reopened] + types: [opened, reopened, synchronize] branches: [ main ] + push: + branches: [ main ] jobs: + cancel-old-build: + name: Cancel previous builds + runs-on: [self-hosted, linux, x64] + + steps: + - name: Cancel Previous Runs + if: ${{ github.ref_name != 'main'}} + uses: styfle/[email protected] + with: + access_token: ${{ github.token }} + clang-format: + needs: cancel-old-build name: Check clang-format runs-on: [self-hosted, linux, x64] - strategy: - matrix: - path: - - 'zen' - - 'zencore' - - 'zencore-test' - - 'zenhttp' - - 'zenserver-test' - - 'zenstore' - - 'zenstore-test' - - 'zentest-appstub' - - 'zenutil' - - 'zenserver' + steps: - uses: actions/checkout@v2 - - name: clang-format ${{ matrix.path }} + + - name: clang-format uses: jidicula/[email protected] with: clang-format-version: '13' - check-path: ${{ matrix.path }} + check-path: '.' + exclude-regex: (.*thirdparty.*) windows-build: - name: Build Windows - needs: clang-format + needs: cancel-old-build + name: Build & Test Windows runs-on: [self-hosted, windows, x64] + timeout-minutes: 10 strategy: matrix: config: @@ -45,7 +50,8 @@ jobs: VCPKG_VERSION: 2022.03.10 steps: - - uses: actions/checkout@v2 + - name: Checkout + uses: actions/checkout@v2 - name: Setup xmake uses: xmake-io/github-action-setup-xmake@v1 @@ -67,6 +73,20 @@ jobs: ${{ github.workspace }}\.vcpkg\installed key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5 + - name: Bundle + if: ${{ github.ref_name == 'main' && matrix.config == 'release' }} + run: | + xmake bundle -v -y + env: + VCPKG_ROOT: ${{ github.workspace }}/.vcpkg + + - name: Upload zenserver-win64 + if: ${{ github.ref_name == 'main' && matrix.config == 'release' }} + uses: actions/upload-artifact@v3 + with: + name: zenserver-win64 + path: build/zenserver-win64.zip + - name: Config run: | xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }} @@ -80,9 +100,10 @@ jobs: VCPKG_ROOT: ${{ github.workspace }}/.vcpkg linux-build: - name: Build Linux - needs: clang-format + needs: cancel-old-build + name: Build & Test Linux runs-on: [self-hosted, linux, x64] + timeout-minutes: 10 strategy: matrix: config: @@ -94,7 +115,8 @@ jobs: VCPKG_VERSION: 2022.03.10 steps: - - uses: actions/checkout@v2 + - name: Checkout + uses: actions/checkout@v2 - name: Set up GCC 11 uses: egor-tensin/setup-gcc@v1 @@ -121,6 +143,20 @@ jobs: ${{ github.workspace }}/.vcpkg/installed key: ${{ runner.os }}-${{ matrix.config }}-${{env.VCPKG_VERSION}}-${{ hashFiles('xmake.lua') }}-${{ matrix.arch }}-v5 + - name: Bundle + if: ${{ github.ref_name == 'main' && matrix.config == 'release' }} + run: | + xmake bundle -v -y + env: + VCPKG_ROOT: ${{ github.workspace }}/.vcpkg + + - name: Upload zenserver-linux + if: ${{ github.ref_name == 'main' && matrix.config == 'release' }} + uses: actions/upload-artifact@v3 + with: + name: zenserver-linux + path: build/zenserver-linux.zip + - name: Config run: | xmake config -v -y -m ${{ matrix.config }} --arch=${{ matrix.arch }} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..4d8fae2ed --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,24 @@ +## +- BlockStore (small object store) Always block GC of current write block +- Make it possible to configure GC monitoring interval using `--gc-monitor-interval-seconds` +- Keep "reason" from upstream response so we can present it even if the request fails without outright error +- New GitHub Actions release flow - Add release flow in GitHub actions on pushed tag `v0.1.2` gives full release, `v0.1.2-pre0` gives pre-release + +## 0d08450 +- Fixes issue with broken Zen instances for legacy requests + +## 63f50b5 +- Enable FILE_SHARE_DELETE on standalone files in disk buckets - fixes Jira UE-154234 +- Make sure we can properly create the block file before assigning it for use - fixes Jira UE-154438 +- Horde execute compressed input blobs +- Drop namespace support +- Safer delete of cache buckets + +## dba8b36 +- Namespaces: This introduces namespaces to the zenserver but only the default ue4.ddc is supported. Clients that don't send a namespace in the request will keep old behviour, new clients that sends namespace is required to use ue4.ddc (which they currently do) +- Aligned bucket naming rules with UE code base +- Fix retry counter and add an extra iteration to give more time for success during contention for standalone files in cache +- Make sure CacheBucket::PutStandaloneCacheValue cleans up the temp file +- Restore logic where we accept failed overwrite if resulting size is the same for standlone file in cache +- Correctly calculate the m_TotalSize difference when overwriting file for standalone files in cache +- Fix namespace folder scanning @@ -194,12 +194,7 @@ xmake build # Contributing Code -To run the pre-commit scripts you'll need a few things: - -* We rely on clang-format for consistent code formatting. You can install version 12 or later from https://llvm.org/builds/ -* The helper scripts also depend on Python 3.x, which you may install from https://www.python.org/downloads/windows/ (I am presently using 3.9.5 which works). NOTE: *do* check the option to add Python to your PATH! - -Once you have those dependencies, you can simply run `prepare_commit.bat` to ensure the code is properly formatted and has the Epic copyright header comment. I'm sure there's a better way to integrate this into the git submit flow but my git-fu is not strong enough yet to know how to do that best. +See [CODING.md](CODING.md) # Debugging @@ -212,7 +207,7 @@ is incredibly handy. When that is installed you may enable auto-attach to child * `zencore-test` exercises unit tests in the zencore project * `zenserver-test` exercises the zen server itself (functional tests) -The tests are implemented using [doctest](https://github.com/onqtam/doctest), which is similar to Catch in usage. +The tests are implemented using [doctest](https://github.com/onqtam/doctest), which is similar to Catch in usage. We now also support [catch2](https://github.com/catchorg/Catch2) # Adding a http.sys URL reservation (Windows only) @@ -223,12 +218,3 @@ Registering a handler for an HTTP endpoint requires either process elevation (i. or `netsh http add urlacl url=http://*:1337/ sddl=D:(A;;GX;;;S-1-1-0)` (enable for any authenticated user) - -# Coding Standards - -See [CODING.md](CODING.md) - -Run `prepare_commit.bat` before committing code. It ensures all source files are formatted with -clang-format which you will need to install. - -(More helpful instructions needed here :) @@ -1,6 +1,6 @@ -- Copyright Epic Games, Inc. All Rights Reserved. -set_version("0.1.0", { build = "%Y%m%d%H%M" }) +set_version("0.1.1", { build = "%Y%m%d%H%M" }) set_configvar("ZEN_SCHEMA_VERSION", 3) -- changed cas oplog format (p3rl) add_requires( diff --git a/zencore-test/zencore-test.cpp b/zencore-test/zencore-test.cpp index 327f2f0b5..53413fb25 100644 --- a/zencore-test/zencore-test.cpp +++ b/zencore-test/zencore-test.cpp @@ -19,7 +19,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) zen::logging::InitializeLogging(); - return ZEN_RUN_TESTS(argc, argv); + return ZEN_RUN_TESTS(argc, argv); #else return 0; #endif diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index 5d9daa1c7..bf658922d 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -403,6 +403,7 @@ class IoBufferBuilder { public: ZENCORE_API static IoBuffer MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset = 0, uint64_t Size = ~0ull); + ZENCORE_API static IoBuffer MakeFromFileWithSharedDelete(const std::filesystem::path& FileName); ZENCORE_API static IoBuffer MakeFromTemporaryFile(const std::filesystem::path& FileName); ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull); ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer); diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index c4b7f7bdf..56b05d86d 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -469,15 +469,20 @@ IoBufferBuilder::MakeFromFileHandle(void* FileHandle, uint64_t Offset, uint64_t return IoBuffer(IoBuffer::BorrowedFile, FileHandle, Offset, Size); } -IoBuffer -IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size) +static IoBuffer +MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size, bool UseShareDelete) { uint64_t FileSize; #if ZEN_PLATFORM_WINDOWS CAtlFile DataFile; - HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + DWORD ShareOptions = FILE_SHARE_READ; + if (UseShareDelete) + { + ShareOptions |= FILE_SHARE_DELETE; + } + HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, FILE_SHARE_READ | ShareOptions, OPEN_EXISTING); if (FAILED(hRes)) { @@ -486,7 +491,12 @@ IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Of DataFile.GetSize((ULONGLONG&)FileSize); #else - int Fd = open(FileName.c_str(), O_RDONLY); + int Flags = O_RDONLY; + if (UseShareDelete) + { + Flags |= O_CLOEXEC; + } + int Fd = open(FileName.c_str(), Flags); if (Fd < 0) { return {}; @@ -530,6 +540,18 @@ IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Of } IoBuffer +IoBufferBuilder::MakeFromFileWithSharedDelete(const std::filesystem::path& FileName) +{ + return MakeFromFileWithOptions(FileName, 0, ~0ull, true); +} + +IoBuffer +IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size) +{ + return MakeFromFileWithOptions(FileName, Offset, Size, false); +} + +IoBuffer IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) { uint64_t FileSize; diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 452a67fd0..e995bf079 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -330,8 +330,8 @@ main(int argc, char** argv) TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir); ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); - - return ZEN_RUN_TESTS(argc, argv); + + return ZEN_RUN_TESTS(argc, argv); } namespace zen::tests { @@ -2733,8 +2733,13 @@ TEST_CASE("http.package") CHECK_EQ(ResponsePackage, TestPackage); } -TEST_CASE("websocket.basic") // * doctest::skip(true)) +TEST_CASE("websocket.basic") { + if (true) + { + return; + } + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); const uint16_t PortNumber = 13337; const auto MaxWaitTime = std::chrono::seconds(5); diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index e11499289..79d15a204 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -398,7 +398,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) } void -HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view) +HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view Namespace) { switch (Request.RequestVerb()) { @@ -412,14 +412,14 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& case HttpVerb::kDelete: // Drop namespace { - // if (m_CacheStore.DropNamespace(Namespace)) - // { - // return Request.WriteResponse(HttpResponseCode::OK); - // } - // else - // { - // return Request.WriteResponse(HttpResponseCode::NotFound); - // } + if (m_CacheStore.DropNamespace(Namespace)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound); + } } break; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index da948fd72..ee0835fd3 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -205,6 +205,36 @@ namespace { return true; } + bool MoveAndDeleteDirectory(const std::filesystem::path& Dir) + { + int DropIndex = 0; + do + { + if (!std::filesystem::exists(Dir)) + { + return false; + } + + std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), DropIndex); + std::filesystem::path DroppedBucketPath = Dir.parent_path() / DroppedName; + if (std::filesystem::exists(DroppedBucketPath)) + { + DropIndex++; + continue; + } + + std::error_code Ec; + std::filesystem::rename(Dir, DroppedBucketPath, Ec); + if (!Ec) + { + DeleteDirectories(DroppedBucketPath); + return true; + } + // TODO: Do we need to bail at some point? + zen::Sleep(100); + } while (true); + } + } // namespace namespace fs = std::filesystem; @@ -342,6 +372,13 @@ ZenCacheNamespace::DropBucket(std::string_view Bucket) return AnyDropped; } +bool +ZenCacheNamespace::Drop() +{ + m_MemLayer.Drop(); + return m_DiskLayer.Drop(); +} + void ZenCacheNamespace::Flush() { @@ -404,14 +441,14 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa { RwLock::SharedLockScope _(m_Lock); - auto it = m_Buckets.find(std::string(InBucket)); + auto It = m_Buckets.find(std::string(InBucket)); - if (it == m_Buckets.end()) + if (It == m_Buckets.end()) { return false; } - CacheBucket* Bucket = &it->second; + CacheBucket* Bucket = It->second.get(); _.ReleaseNow(); @@ -425,14 +462,15 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa void ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { - CacheBucket* Bucket = nullptr; + const auto BucketName = std::string(InBucket); + CacheBucket* Bucket = nullptr; { RwLock::SharedLockScope _(m_Lock); if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { - Bucket = &It->second; + Bucket = It->second.get(); } } @@ -444,11 +482,12 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const if (auto It = m_Buckets.find(std::string(InBucket)); It != m_Buckets.end()) { - Bucket = &It->second; + Bucket = It->second.get(); } else { - Bucket = &m_Buckets[std::string(InBucket)]; + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>()); + Bucket = InsertResult.first->second.get(); } } @@ -458,11 +497,37 @@ ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const } bool -ZenCacheMemoryLayer::DropBucket(std::string_view Bucket) +ZenCacheMemoryLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); - return !!m_Buckets.erase(std::string(Bucket)); + auto It = m_Buckets.find(std::string(InBucket)); + + if (It != m_Buckets.end()) + { + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It); + Bucket.Drop(); + return true; + } + return false; +} + +void +ZenCacheMemoryLayer::Drop() +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::vector<std::unique_ptr<CacheBucket>> Buckets; + Buckets.reserve(m_Buckets.size()); + while (!m_Buckets.empty()) + { + const auto& It = m_Buckets.begin(); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It->first); + Bucket.Drop(); + } } void @@ -472,7 +537,7 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { - Kv.second.Scrub(Ctx); + Kv.second->Scrub(Ctx); } } @@ -486,7 +551,7 @@ ZenCacheMemoryLayer::GatherAccessTimes(zen::access_tracking::AccessTimes& Access for (auto& Kv : m_Buckets) { std::vector<KeyAccessTime>& Bucket = AccessTimes.Buckets[Kv.first]; - Kv.second.GatherAccessTimes(Bucket); + Kv.second->GatherAccessTimes(Bucket); } } @@ -505,7 +570,7 @@ ZenCacheMemoryLayer::TotalSize() const for (auto& Kv : m_Buckets) { - TotalSize += Kv.second.TotalSize(); + TotalSize += Kv.second->TotalSize(); } return TotalSize; @@ -570,9 +635,16 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue m_TotalSize.fetch_add(Value.Value.GetSize(), std::memory_order::relaxed); } +void +ZenCacheMemoryLayer::CacheBucket::Drop() +{ + RwLock::ExclusiveLockScope _(m_BucketLock); + m_CacheMap.clear(); +} + ////////////////////////////////////////////////////////////////////////// -ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)) +ZenCacheDiskLayer::CacheBucket::CacheBucket(std::string BucketName) : m_BucketName(std::move(BucketName)), m_BucketId(Oid::Zero) { } @@ -581,19 +653,6 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket() } bool -ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) -{ - if (std::filesystem::exists(BucketDir)) - { - DeleteDirectories(BucketDir); - - return true; - } - - return false; -} - -void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { using namespace std::literals; @@ -611,7 +670,10 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo if (Manifest) { m_BucketId = Manifest["BucketId"].AsObjectId(); - m_IsOk = m_BucketId != Oid::Zero; + if (m_BucketId == Oid::Zero) + { + return false; + } } else if (AllowCreate) { @@ -625,7 +687,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } else { - return; + return false; } OpenLog(BucketDir, IsNew); @@ -641,7 +703,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } } - m_IsOk = true; + return true; } void @@ -1144,7 +1206,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) + if (IoBuffer Data = IoBufferBuilder::MakeFromFileWithSharedDelete(DataFilePath.ToPath())) { OutValue.Value = Data; OutValue.Value.SetContentType(Loc.GetContentType()); @@ -1158,11 +1220,6 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { - if (!m_IsOk) - { - return false; - } - RwLock::SharedLockScope _(m_IndexLock); auto It = m_Index.find(HashKey); if (It == m_Index.end()) @@ -1184,11 +1241,6 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { - if (!m_IsOk) - { - return; - } - if (Value.Value.Size() >= m_LargeObjectThreshold) { return PutStandaloneCacheValue(HashKey, Value); @@ -1196,12 +1248,24 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& PutInlineCacheValue(HashKey, Value); } -void +bool ZenCacheDiskLayer::CacheBucket::Drop() { + RwLock::ExclusiveLockScope _(m_IndexLock); + + std::vector<std::unique_ptr<RwLock::ExclusiveLockScope>> ShardLocks; + ShardLocks.reserve(256); + for (RwLock& Lock : m_ShardedLocks) + { + ShardLocks.push_back(std::make_unique<RwLock::ExclusiveLockScope>(Lock)); + } m_BlockStore.Close(); m_SlogFile.Close(); - DeleteDirectories(m_BucketDir); + + bool Deleted = MoveAndDeleteDirectory(m_BucketDir); + + m_Index.clear(); + return Deleted; } void @@ -1703,7 +1767,8 @@ ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) for (auto& Kv : m_Buckets) { - Kv.second.CollectGarbage(GcCtx); + CacheBucket& Bucket = *Kv.second; + Bucket.CollectGarbage(GcCtx); } } @@ -1716,7 +1781,7 @@ ZenCacheDiskLayer::UpdateAccessTimes(const zen::access_tracking::AccessTimes& Ac { if (auto It = m_Buckets.find(Kv.first); It != m_Buckets.end()) { - CacheBucket& Bucket = It->second; + CacheBucket& Bucket = *It->second; Bucket.UpdateAccessTimes(Kv.second); } } @@ -1766,110 +1831,80 @@ ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, c BuildPath(DataFilePath, HashKey); std::filesystem::path FsPath{DataFilePath.ToPath()}; - // We retry to move the file since it can be held open for read. - // This happens if the server processes a Get request for the file or - // if we are busy sending the file upstream - int RetryCount = 4; - do - { - Ec.clear(); - { - RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - - DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); - // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file - // will be disabled as the file handle has already been closed - CleanUpTempFile = Ec ? true : false; - - if (Ec) - { - std::error_code ExistingEc; - uint64_t OldFileSize = std::filesystem::file_size(FsPath, ExistingEc); - if (!ExistingEc && (OldFileSize == NewFileSize)) - { - ZEN_INFO( - "Failed to move temporary file '{}' to '{}' for '{}'. Target file has same size, assuming concurrent write of same " - "value, " - "move " - "failed with reason '{}'", - DataFile.GetPath(), - FsPath.string(), - m_BucketDir, - Ec.message()); - return; - } - } - } + // We do a speculative remove of the file instead of probing with a exists call and check the error code instead + std::filesystem::remove(FsPath, Ec); + if (Ec && Ec.value() != ENOENT) + { + throw std::system_error(Ec, fmt::format("Failed to replace file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); + } - if (!Ec) + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + std::filesystem::path ParentPath = FsPath.parent_path(); + if (std::filesystem::is_directory(ParentPath)) { - uint8_t EntryFlags = DiskLocation::kStandaloneFile; - - if (Value.Value.GetContentType() == ZenContentType::kCbObject) - { - EntryFlags |= DiskLocation::kStructured; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - EntryFlags |= DiskLocation::kCompressed; - } - - DiskLocation Loc(NewFileSize, EntryFlags); - IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); - - uint64_t OldFileSize = 0; - RwLock::ExclusiveLockScope _(m_IndexLock); - if (auto It = m_Index.find(HashKey); It == m_Index.end()) - { - // Previously unknown object - m_Index.insert({HashKey, Entry}); - } - else - { - // TODO: should check if write is idempotent and bail out if it is? - OldFileSize = It.value().Location.Size(); - It.value() = Entry; - } - - m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - if (OldFileSize <= NewFileSize) - { - m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed); - } - else - { - m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed); - } - return; + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); } - - std::filesystem::path ParentPath = FsPath.parent_path(); - if (!std::filesystem::is_directory(ParentPath)) + Ec.clear(); + std::filesystem::create_directories(ParentPath, Ec); + if (Ec) { - Ec.clear(); - std::filesystem::create_directories(ParentPath, Ec); - if (!Ec) - { - // Retry without sleep - continue; - } throw std::system_error( Ec, fmt::format("Failed to create parent directory '{}' for file '{}' for put in '{}'", ParentPath, FsPath, m_BucketDir)); } - ZEN_INFO("Failed renaming temporary file '{}' to '{}' for put in '{}', pausing and retrying, reason '{}'", - DataFile.GetPath().string(), - FsPath.string(), - m_BucketDir, - Ec.message()); + DataFile.MoveTemporaryIntoPlace(FsPath, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); + } + } + + // Once we have called MoveTemporaryIntoPlace automatic clean up the temp file + // will be disabled as the file handle has already been closed + CleanUpTempFile = false; - // Semi arbitrary back-off - zen::Sleep(200 * (5 - RetryCount)); // Sleep at most for a total of 3 seconds - } while (RetryCount-- > 0); + uint8_t EntryFlags = DiskLocation::kStandaloneFile; - throw std::system_error(Ec, fmt::format("Failed to finalize file '{}' for put in '{}'", DataFilePath.ToUtf8(), m_BucketDir)); + if (Value.Value.GetContentType() == ZenContentType::kCbObject) + { + EntryFlags |= DiskLocation::kStructured; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + EntryFlags |= DiskLocation::kCompressed; + } + + DiskLocation Loc(NewFileSize, EntryFlags); + IndexEntry Entry = IndexEntry(Loc, GcClock::TickCount()); + + uint64_t OldFileSize = 0; + RwLock::ExclusiveLockScope _(m_IndexLock); + if (auto It = m_Index.find(HashKey); It == m_Index.end()) + { + // Previously unknown object + m_Index.insert({HashKey, Entry}); + } + else + { + // TODO: should check if write is idempotent and bail out if it is? + OldFileSize = It.value().Location.Size(); + It.value() = Entry; + } + + m_SlogFile.Append({.Key = HashKey, .Location = Loc}); + if (OldFileSize <= NewFileSize) + { + m_TotalSize.fetch_add(NewFileSize - OldFileSize, std::memory_order::relaxed); + } + else + { + m_TotalSize.fetch_sub(OldFileSize - NewFileSize, std::memory_order::relaxed); + } } void @@ -1925,11 +1960,11 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach { RwLock::SharedLockScope _(m_Lock); - auto it = m_Buckets.find(BucketName); + auto It = m_Buckets.find(BucketName); - if (it != m_Buckets.end()) + if (It != m_Buckets.end()) { - Bucket = &it->second; + Bucket = It->second.get(); } } @@ -1939,24 +1974,27 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach RwLock::ExclusiveLockScope _(m_Lock); - if (auto it = m_Buckets.find(BucketName); it != m_Buckets.end()) + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { - Bucket = &it->second; + Bucket = It->second.get(); } else { - auto It = m_Buckets.try_emplace(BucketName, BucketName); - Bucket = &It.first->second; + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; - Bucket->OpenOrCreate(BucketPath); + if (!Bucket->OpenOrCreate(BucketPath)) + { + m_Buckets.erase(BucketName); + return false; + } } } ZEN_ASSERT(Bucket != nullptr); - return Bucket->Get(HashKey, OutValue); } @@ -1969,11 +2007,11 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z { RwLock::SharedLockScope _(m_Lock); - auto it = m_Buckets.find(BucketName); + auto It = m_Buckets.find(BucketName); - if (it != m_Buckets.end()) + if (It != m_Buckets.end()) { - Bucket = &it->second; + Bucket = It->second.get(); } } @@ -1983,28 +2021,29 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z RwLock::ExclusiveLockScope _(m_Lock); - if (auto it = m_Buckets.find(BucketName); it != m_Buckets.end()) + if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { - Bucket = &it->second; + Bucket = It->second.get(); } else { - auto It = m_Buckets.try_emplace(BucketName, BucketName); - Bucket = &It.first->second; + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + Bucket = InsertResult.first->second.get(); std::filesystem::path BucketPath = m_RootDir; BucketPath /= BucketName; - Bucket->OpenOrCreate(BucketPath); + if (!Bucket->OpenOrCreate(BucketPath)) + { + m_Buckets.erase(BucketName); + return; + } } } ZEN_ASSERT(Bucket != nullptr); - if (Bucket->IsOk()) - { - Bucket->Put(HashKey, Value); - } + Bucket->Put(HashKey, Value); } void @@ -2023,26 +2062,20 @@ ZenCacheDiskLayer::DiscoverBuckets() // New bucket needs to be created if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end()) { + continue; } - else - { - auto InsertResult = m_Buckets.try_emplace(BucketName, BucketName); - - CacheBucket& Bucket = InsertResult.first->second; - Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false); + auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName)); + CacheBucket& Bucket = *InsertResult.first->second; - if (Bucket.IsOk()) - { - ZEN_INFO("Discovered bucket '{}'", BucketName); - } - else - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - m_Buckets.erase(InsertResult.first); - } + m_Buckets.erase(InsertResult.first); + continue; } + ZEN_INFO("Discovered bucket '{}'", BucketName); } } @@ -2051,23 +2084,42 @@ ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { RwLock::ExclusiveLockScope _(m_Lock); - auto it = m_Buckets.find(std::string(InBucket)); + auto It = m_Buckets.find(std::string(InBucket)); - if (it != m_Buckets.end()) + if (It != m_Buckets.end()) { - CacheBucket* Bucket = &it->second; - - Bucket->Drop(); - - m_Buckets.erase(it); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It); - return true; + return Bucket.Drop(); } + // Make sure we remove the folder even if we don't know about the bucket std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); + return MoveAndDeleteDirectory(BucketPath); +} - return CacheBucket::Delete(BucketPath); +bool +ZenCacheDiskLayer::Drop() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + std::vector<std::unique_ptr<CacheBucket>> Buckets; + Buckets.reserve(m_Buckets.size()); + while (!m_Buckets.empty()) + { + const auto& It = m_Buckets.begin(); + CacheBucket& Bucket = *It->second; + m_DroppedBuckets.push_back(std::move(It->second)); + m_Buckets.erase(It->first); + if (!Bucket.Drop()) + { + return false; + } + } + return MoveAndDeleteDirectory(m_RootDir); } void @@ -2080,7 +2132,8 @@ ZenCacheDiskLayer::Flush() Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { - Buckets.push_back(&Kv.second); + CacheBucket* Bucket = Kv.second.get(); + Buckets.push_back(Bucket); } } @@ -2097,7 +2150,8 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { - Kv.second.Scrub(Ctx); + CacheBucket& Bucket = *Kv.second; + Bucket.Scrub(Ctx); } } @@ -2108,7 +2162,8 @@ ZenCacheDiskLayer::GatherReferences(GcContext& GcCtx) for (auto& Kv : m_Buckets) { - Kv.second.GatherReferences(GcCtx); + CacheBucket& Bucket = *Kv.second; + Bucket.GatherReferences(GcCtx); } } @@ -2120,7 +2175,7 @@ ZenCacheDiskLayer::TotalSize() const for (auto& Kv : m_Buckets) { - TotalSize += Kv.second.TotalSize(); + TotalSize += Kv.second->TotalSize(); } return TotalSize; @@ -2130,18 +2185,22 @@ ZenCacheDiskLayer::TotalSize() const static constexpr std::string_view UE4DDCNamespaceName = "ue4.ddc"; -ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStorage(Gc), GcContributor(Gc) +ZenCacheStore::ZenCacheStore(CasGc& Gc, const Configuration& Configuration) +: GcStorage(Gc) +, GcContributor(Gc) +, m_Gc(Gc) +, m_Configuration(Configuration) { - CreateDirectories(BasePath); + CreateDirectories(m_Configuration.BasePath); DirectoryContent DirContent; - GetDirectoryContent(BasePath, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_Configuration.BasePath, DirectoryContent::IncludeDirsFlag, DirContent); std::vector<std::string> LegacyBuckets; std::vector<std::string> Namespaces; for (const std::filesystem::path& DirPath : DirContent.Directories) { - std::string DirName = PathToUtf8(DirPath.stem()); + std::string DirName = PathToUtf8(DirPath.filename()); if (DirName.starts_with(NamespaceDiskPrefix)) { Namespaces.push_back(DirName.substr(NamespaceDiskPrefix.length())); @@ -2150,7 +2209,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor LegacyBuckets.push_back(DirName); } - ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), BasePath, LegacyBuckets.size()); + ZEN_INFO("Found #{} namespaces in '{}' and #{} legacy buckets", Namespaces.size(), m_Configuration.BasePath, LegacyBuckets.size()); if (std::find(Namespaces.begin(), Namespaces.end(), UE4DDCNamespaceName) == Namespaces.end()) { @@ -2158,13 +2217,14 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor ZEN_INFO("Moving #{} legacy buckets to '{}' namespace", LegacyBuckets.size(), UE4DDCNamespaceName); - std::filesystem::path DefaultNamespaceFolder = BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); + std::filesystem::path DefaultNamespaceFolder = + m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, UE4DDCNamespaceName); CreateDirectories(DefaultNamespaceFolder); // Move any non-namespace folders into the default namespace folder for (const std::string& DirName : LegacyBuckets) { - std::filesystem::path LegacyFolder = BasePath / DirName; + std::filesystem::path LegacyFolder = m_Configuration.BasePath / DirName; std::filesystem::path NewPath = DefaultNamespaceFolder / DirName; std::error_code Ec; std::filesystem::rename(LegacyFolder, NewPath, Ec); @@ -2179,7 +2239,7 @@ ZenCacheStore::ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath) : GcStor for (const std::string& NamespaceName : Namespaces) { m_Namespaces[NamespaceName] = - std::make_unique<ZenCacheNamespace>(Gc, BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); + std::make_unique<ZenCacheNamespace>(Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName)); } } @@ -2216,7 +2276,22 @@ ZenCacheStore::DropBucket(std::string_view Namespace, std::string_view Bucket) { return Store->DropBucket(Bucket); } - ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put, bucket '{}'", Namespace, Bucket); + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropBucket, bucket '{}'", Namespace, Bucket); + return false; +} + +bool +ZenCacheStore::DropNamespace(std::string_view InNamespace) +{ + RwLock::SharedLockScope _(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(InNamespace)); It != m_Namespaces.end()) + { + ZenCacheNamespace& Namespace = *It->second; + m_DroppedNamespaces.push_back(std::move(It->second)); + m_Namespaces.erase(It); + return Namespace.Drop(); + } + ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::DropNamespace", InNamespace); return false; } @@ -2247,13 +2322,29 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) return It->second.get(); } } - return nullptr; + _.ReleaseNow(); + + if (!m_Configuration.AllowAutomaticCreationOfNamespaces) + { + return nullptr; + } + + RwLock::ExclusiveLockScope __(m_NamespacesLock); + if (auto It = m_Namespaces.find(std::string(Namespace)); It != m_Namespaces.end()) + { + return It->second.get(); + } + + auto NewNamespace = m_Namespaces.insert_or_assign( + std::string(Namespace), + std::make_unique<ZenCacheNamespace>(m_Gc, m_Configuration.BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace))); + return NewNamespace.first->second.get(); } void ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const { - std::vector<std::pair<std::string, ZenCacheNamespace&> > Namespaces; + std::vector<std::pair<std::string, ZenCacheNamespace&>> Namespaces; { RwLock::SharedLockScope _(m_NamespacesLock); Namespaces.reserve(m_Namespaces.size()); @@ -3048,40 +3139,210 @@ TEST_CASE("z$.namespaces") ScopedTemporaryDirectory TempDir; CreateDirectories(TempDir.Path()); + IoHash Key1; + IoHash Key2; { CasGc Gc; - ZenCacheStore Zcs(Gc, TempDir.Path() / "cache"); + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = false}); const auto Bucket = "teardrinker"sv; const auto CustomNamespace = "mynamespace"sv; // Create a cache record - const IoHash Key = CreateKey(42); - CbObject CacheValue = CreateCacheValue(4096); + Key1 = CreateKey(42); + CbObject CacheValue = CreateCacheValue(4096); IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key, PutValue); + Zcs.Put(ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue); ZenCacheValue GetValue; - CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key, GetValue)); + CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + // This should just be dropped as we don't allow creating of namespaces on the fly + Zcs.Put(CustomNamespace, Bucket, Key1, PutValue); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); + } - // This should just be dropped for now until we decide how we add namespaces - Zcs.Put(CustomNamespace, Bucket, Key, PutValue); - CHECK(!Zcs.Get(CustomNamespace, Bucket, Key, GetValue)); + { + CasGc Gc; + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); + const auto Bucket = "teardrinker"sv; + const auto CustomNamespace = "mynamespace"sv; - const IoHash Key2 = CreateKey(43); - CbObject CacheValue2 = CreateCacheValue(4096); + Key2 = CreateKey(43); + CbObject CacheValue2 = CreateCacheValue(4096); IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue2 = {.Value = Buffer2}; Zcs.Put(CustomNamespace, Bucket, Key2, PutValue2); + ZenCacheValue GetValue; CHECK(!Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); + CHECK(Zcs.Get(ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); + CHECK(!Zcs.Get(CustomNamespace, Bucket, Key1, GetValue)); + CHECK(Zcs.Get(CustomNamespace, Bucket, Key2, GetValue)); + } +} + +TEST_CASE("z$.drop.bucket") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + IoHash Key1; + IoHash Key2; + + auto PutValue = + [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { + // Create a cache record + IoHash Key = CreateKey(KeyIndex); + CbObject CacheValue = CreateCacheValue(Size); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(Namespace, Bucket, Key, PutValue); + return Key; + }; + auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { + ZenCacheValue GetValue; + Zcs.Get(Namespace, Bucket, Key, GetValue); + return GetValue; + }; + WorkerThreadPool Workers(1); + { + CasGc Gc; + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); + const auto Bucket = "teardrinker"sv; + const auto Namespace = "mynamespace"sv; + + Key1 = PutValue(Zcs, Namespace, Bucket, 42, 4096); + Key2 = PutValue(Zcs, Namespace, Bucket, 43, 2048); + + ZenCacheValue Value1 = GetValue(Zcs, Namespace, Bucket, Key1); + CHECK(Value1.Value); + + std::atomic_bool WorkComplete = false; + Workers.ScheduleWork([&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + WorkComplete = true; + }); + // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket + // Our DropBucket execution blocks any incoming request from completing until we are done with the drop + CHECK(Zcs.DropBucket(Namespace, Bucket)); + while (!WorkComplete) + { + zen::Sleep(1); + } + + // Entire bucket should be dropped, but doing a request should will re-create the namespace but it must still be empty + Value1 = GetValue(Zcs, Namespace, Bucket, Key1); + CHECK(!Value1.Value); + ZenCacheValue Value2 = GetValue(Zcs, Namespace, Bucket, Key2); + CHECK(!Value2.Value); + } +} + +TEST_CASE("z$.drop.namespace") +{ + using namespace testutils; + + const auto CreateCacheValue = [](size_t Size) -> CbObject { + std::vector<uint8_t> Buf; + Buf.resize(Size); + + CbObjectWriter Writer; + Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); + return Writer.Save(); + }; + + ScopedTemporaryDirectory TempDir; + CreateDirectories(TempDir.Path()); + + auto PutValue = + [&CreateCacheValue](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, size_t KeyIndex, size_t Size) { + // Create a cache record + IoHash Key = CreateKey(KeyIndex); + CbObject CacheValue = CreateCacheValue(Size); + + IoBuffer Buffer = CacheValue.GetBuffer().AsIoBuffer(); + Buffer.SetContentType(ZenContentType::kCbObject); + + ZenCacheValue PutValue = {.Value = Buffer}; + Zcs.Put(Namespace, Bucket, Key, PutValue); + return Key; + }; + auto GetValue = [](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { + ZenCacheValue GetValue; + Zcs.Get(Namespace, Bucket, Key, GetValue); + return GetValue; + }; + WorkerThreadPool Workers(1); + { + CasGc Gc; + ZenCacheStore Zcs(Gc, {.BasePath = TempDir.Path() / "cache", .AllowAutomaticCreationOfNamespaces = true}); + const auto Bucket1 = "teardrinker1"sv; + const auto Bucket2 = "teardrinker2"sv; + const auto Namespace1 = "mynamespace1"sv; + const auto Namespace2 = "mynamespace2"sv; + + IoHash Key1 = PutValue(Zcs, Namespace1, Bucket1, 42, 4096); + IoHash Key2 = PutValue(Zcs, Namespace1, Bucket2, 43, 2048); + IoHash Key3 = PutValue(Zcs, Namespace2, Bucket1, 44, 4096); + IoHash Key4 = PutValue(Zcs, Namespace2, Bucket2, 45, 2048); + + ZenCacheValue Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); + CHECK(Value1.Value); + ZenCacheValue Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); + CHECK(Value2.Value); + ZenCacheValue Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); + CHECK(Value3.Value); + ZenCacheValue Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); + CHECK(Value4.Value); + + std::atomic_bool WorkComplete = false; + Workers.ScheduleWork([&]() { + zen::Sleep(100); + Value1.Value = IoBuffer{}; + Value2.Value = IoBuffer{}; + Value3.Value = IoBuffer{}; + Value4.Value = IoBuffer{}; + WorkComplete = true; + }); + // On Windows, DropBucket() will be blocked as long as we hold a reference to a buffer in the bucket + // Our DropBucket execution blocks any incoming request from completing until we are done with the drop + CHECK(Zcs.DropNamespace(Namespace1)); + while (!WorkComplete) + { + zen::Sleep(1); + } + + // Entire namespace should be dropped, but doing a request should will re-create the namespace but it must still be empty + Value1 = GetValue(Zcs, Namespace1, Bucket1, Key1); + CHECK(!Value1.Value); + Value2 = GetValue(Zcs, Namespace1, Bucket2, Key2); + CHECK(!Value2.Value); + Value3 = GetValue(Zcs, Namespace2, Bucket1, Key3); + CHECK(Value3.Value); + Value4 = GetValue(Zcs, Namespace2, Bucket2, Key4); + CHECK(Value4.Value); } } @@ -3093,7 +3354,7 @@ TEST_CASE("z$.blocked.disklayer.put") const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector<uint8_t> Buf; - Buf.resize(Size); + Buf.resize(Size, Size & 0xff); CbObjectWriter Writer; Writer.AddBinary("Binary"sv, Buf.data(), Buf.size()); @@ -3115,25 +3376,26 @@ TEST_CASE("z$.blocked.disklayer.put") ZenCacheValue BufferGet; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); - // Overwriting with a value of same size should go fine - Zcs.Put("test_bucket", HashKey, {.Value = Buffer}); - CbObject CacheValue2 = CreateCacheValue(64 * 1024 + 64 + 1); IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); -# if ZEN_PLATFORM_WINDOWS - // On Windows platform, overwriting with different size while we have - // it open for read should throw exception if file is held open - CHECK_THROWS(Zcs.Put("test_bucket", HashKey, {.Value = Buffer2})); -# else - // Other platforms should handle overwrite just fine + + // We should be able to overwrite even if the file is open for read Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); -# endif - BufferGet = ZenCacheValue{}; + MemoryView OldView = BufferGet.Value.GetView(); - // Read access has been removed, we should now be able to overwrite it - Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}); + ZenCacheValue BufferGet2; + CHECK(Zcs.Get("test_bucket", HashKey, BufferGet2)); + MemoryView NewView = BufferGet2.Value.GetView(); + + // Make sure file openend for read before we wrote it still have old data + CHECK(OldView.GetSize() == Buffer.GetSize()); + CHECK(memcmp(OldView.GetData(), Buffer.GetData(), OldView.GetSize()) == 0); + + // Make sure we get the new data when reading after we write new data + CHECK(NewView.GetSize() == Buffer2.GetSize()); + CHECK(memcmp(NewView.GetData(), Buffer2.GetData(), NewView.GetSize()) == 0); } #endif diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 34b8d18f4..c226074fe 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -150,6 +150,7 @@ public: bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); bool DropBucket(std::string_view Bucket); void Scrub(ScrubContext& Ctx); void GatherAccessTimes(zen::access_tracking::AccessTimes& AccessTimes); @@ -193,14 +194,16 @@ private: bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); void Scrub(ScrubContext& Ctx); void GatherAccessTimes(std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); inline uint64_t TotalSize() const { return m_TotalSize; } }; - mutable RwLock m_Lock; - std::unordered_map<std::string, CacheBucket> m_Buckets; - Configuration m_Configuration; + mutable RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; + std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; + Configuration m_Configuration; ZenCacheMemoryLayer(const ZenCacheMemoryLayer&) = delete; ZenCacheMemoryLayer& operator=(const ZenCacheMemoryLayer&) = delete; @@ -214,6 +217,7 @@ public: bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool Drop(); bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); @@ -233,19 +237,16 @@ private: CacheBucket(std::string BucketName); ~CacheBucket(); - void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); - static bool Delete(std::filesystem::path BucketDir); - bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const IoHash& HashKey, const ZenCacheValue& Value); - void Drop(); - void Flush(); - void SaveManifest(); - void Scrub(ScrubContext& Ctx); - void GatherReferences(GcContext& GcCtx); - void CollectGarbage(GcContext& GcCtx); - void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); + bool OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + bool Drop(); + void Flush(); + void Scrub(ScrubContext& Ctx); + void GatherReferences(GcContext& GcCtx); + void CollectGarbage(GcContext& GcCtx); + void UpdateAccessTimes(const std::vector<zen::access_tracking::KeyAccessTime>& AccessTimes); - inline bool IsOk() const { return m_IsOk; } inline uint64_t TotalSize() const { return m_TotalSize.load(std::memory_order::relaxed); } private: @@ -257,7 +258,6 @@ private: std::filesystem::path m_BlocksBasePath; BlockStore m_BlockStore; Oid m_BucketId; - bool m_IsOk = false; uint64_t m_LargeObjectThreshold = 64 * 1024; // These files are used to manage storage of small objects for this bucket @@ -292,16 +292,18 @@ private: std::atomic_uint64_t m_TotalSize{}; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); - void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); - void MakeIndexSnapshot(); - uint64_t ReadIndexFile(); - uint64_t ReadLog(uint64_t LogPosition); - uint64_t MigrateLegacyData(bool CleanSource); - void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); + void PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + void MakeIndexSnapshot(); + uint64_t ReadIndexFile(); + uint64_t ReadLog(uint64_t LogPosition); + uint64_t MigrateLegacyData(bool CleanSource); + void OpenLog(const std::filesystem::path& BucketDir, const bool IsNew); + static bool Delete(std::filesystem::path BucketDir); + void SaveManifest(); // These locks are here to avoid contention on file creation, therefore it's sufficient // that we take the same lock for the same hash @@ -314,9 +316,10 @@ private: inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } }; - std::filesystem::path m_RootDir; - mutable RwLock m_Lock; - std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive + std::filesystem::path m_RootDir; + mutable RwLock m_Lock; + std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; // TODO: make this case insensitive + std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; @@ -330,6 +333,7 @@ public: bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool Drop(); bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); @@ -360,12 +364,19 @@ public: "!default!"; // This is intentionally not a valid namespace name and will only be used for mapping when no namespace is given static constexpr std::string_view NamespaceDiskPrefix = "ns_"; - ZenCacheStore(CasGc& Gc, std::filesystem::path BasePath); + struct Configuration + { + std::filesystem::path BasePath; + bool AllowAutomaticCreationOfNamespaces = true; + }; + + ZenCacheStore(CasGc& Gc, const Configuration& Configuration); ~ZenCacheStore(); bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Namespace, std::string_view Bucket); + bool DropNamespace(std::string_view Namespace); void Flush(); void Scrub(ScrubContext& Ctx); @@ -377,10 +388,14 @@ private: ZenCacheNamespace* GetNamespace(std::string_view Namespace); void IterateNamespaces(const std::function<void(std::string_view Namespace, ZenCacheNamespace& Store)>& Callback) const; - typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NameSpaceMap; + typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap; + + mutable RwLock m_NamespacesLock; + NamespaceMap m_Namespaces; + std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; - mutable RwLock m_NamespacesLock; - NameSpaceMap m_Namespaces; + CasGc& m_Gc; + Configuration m_Configuration; }; void z$_forcelink(); diff --git a/zenserver/config.cpp b/zenserver/config.cpp index be91ae4f8..c534865dc 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -478,6 +478,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) "Size of gc disk reserve in bytes.", cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"), ""); + + options.add_option("gc", + "", + "gc-monitor-interval-seconds", + "Garbage collection monitoring interval in seconds.", + cxxopts::value<int32_t>(ServerOptions.GcConfig.MonitorIntervalSeconds)->default_value("30"), + ""); try { auto result = options.parse(argc, argv); @@ -770,8 +777,9 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio if (sol::optional<sol::table> GcConfig = lua["gc"]) { - ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); - ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28)); + ServerOptions.GcConfig.MonitorIntervalSeconds = GcConfig.value().get_or("monitorintervalseconds", 30); + ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); + ServerOptions.GcConfig.DiskReserveSize = GcConfig.value().get_or("diskreservesize", uint64_t(1u << 28)); if (sol::optional<sol::table> CacheGcConfig = GcConfig.value()["cache"]) { diff --git a/zenserver/config.h b/zenserver/config.h index 49f039d8d..a07bba9a4 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -96,10 +96,11 @@ struct ZenGcConfig { ZenCasEvictionPolicy Cas; ZenCacheEvictionPolicy Cache; - int32_t IntervalSeconds = 0; - bool CollectSmallObjects = true; - bool Enabled = true; - uint64_t DiskReserveSize = 1ul << 28; + int32_t MonitorIntervalSeconds = 30; + int32_t IntervalSeconds = 0; + bool CollectSmallObjects = true; + bool Enabled = true; + uint64_t DiskReserveSize = 1ul << 28; }; struct ZenServerOptions diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index 2ec24b303..38c798569 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -157,8 +157,13 @@ namespace detail { { ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks(); - std::scoped_lock Lock(m_TaskMutex); - if (m_PendingTasks.contains(UpstreamData.TaskId)) + + bool AlreadyQueued; + { + std::scoped_lock Lock(m_TaskMutex); + AlreadyQueued = m_PendingTasks.contains(UpstreamData.TaskId); + } + if (AlreadyQueued) { // Pending task is already queued, return success ApplyResult.Success = true; @@ -171,7 +176,7 @@ namespace detail { CloudCacheSession StorageSession(m_StorageClient); { - CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); + CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs, UpstreamData.CasIds); ApplyResult.Bytes += Result.Bytes; ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks(); @@ -182,6 +187,22 @@ namespace detail { return ApplyResult; } UpstreamData.Blobs.clear(); + UpstreamData.CasIds.clear(); + } + + { + CloudCacheResult Result = BatchPutCompressedBlobsIfMissing(StorageSession, UpstreamData.Cids); + ApplyResult.Bytes += Result.Bytes; + ApplyResult.ElapsedSeconds += Result.ElapsedSeconds; + ApplyResult.Timepoints["zen-storage-upload-compressed-blobs"] = DateTime::NowTicks(); + if (!Result.Success) + { + ApplyResult.Error = { + .ErrorCode = Result.ErrorCode, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload compressed blobs"}; + return ApplyResult; + } + UpstreamData.Cids.clear(); } { @@ -279,9 +300,11 @@ namespace detail { } } - [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs) + [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, + const std::map<IoHash, IoBuffer>& Blobs, + const std::set<IoHash>& CasIds) { - if (Blobs.size() == 0) + if (Blobs.size() == 0 && CasIds.size() == 0) { return {.Success = true}; } @@ -292,6 +315,7 @@ namespace detail { // Batch check for missing blobs std::set<IoHash> Keys; std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; }); + Keys.insert(CasIds.begin(), CasIds.end()); CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys); Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}", @@ -310,7 +334,22 @@ namespace detail { for (const auto& Hash : ExistsResult.Needs) { - CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, Blobs.at(Hash)); + IoBuffer DataBuffer; + if (Blobs.contains(Hash)) + { + DataBuffer = Blobs.at(Hash); + } + else + { + DataBuffer = m_CasStore.FindChunk(Hash); + if (!DataBuffer) + { + Log().warn("Put blob FAILED, input chunk '{}' missing", Hash); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put blobs"}; + } + } + + CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -326,6 +365,62 @@ namespace detail { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; } + [[nodiscard]] CloudCacheResult BatchPutCompressedBlobsIfMissing(CloudCacheSession& Session, const std::set<IoHash>& Cids) + { + if (Cids.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing compressed blobs + CloudCacheExistsResult ExistsResult = Session.CompressedBlobExists(Session.Client().DefaultBlobStoreNamespace(), Cids); + Log().debug("Queried {} missing compressed blobs Need={} Duration={}s Result={}", + Cids.size(), + ExistsResult.Needs.size(), + ExistsResult.ElapsedSeconds, + ExistsResult.Success); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (!ExistsResult.Success) + { + return { + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, + .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if compressed blobs exist"}; + } + + for (const auto& Hash : ExistsResult.Needs) + { + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Hash); + if (!DataBuffer) + { + Log().warn("Put compressed blob FAILED, input CID chunk '{}' missing", Hash); + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put compressed blobs"}; + } + + CloudCacheResult Result = Session.PutCompressedBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer); + Log().debug("Put compressed blob {} Bytes={} Duration={}s Result={}", + Hash, + Result.Bytes, + Result.ElapsedSeconds, + Result.Success); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put compressed blobs"}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) { if (Objects.size() == 0) @@ -599,6 +694,8 @@ namespace detail { { std::map<IoHash, IoBuffer> Blobs; std::map<IoHash, CbObject> Objects; + std::set<IoHash> CasIds; + std::set<IoHash> Cids; IoHash TaskId; IoHash RequirementsId; }; @@ -957,7 +1054,7 @@ namespace detail { for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) { CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) { return false; } @@ -966,7 +1063,7 @@ namespace detail { for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) { CbObjectView FileEntry = It.AsObjectView(); - if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds)) { return false; } @@ -1034,11 +1131,10 @@ namespace detail { bool AnyErrors = false; ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { - const IoHash Cid = Field.AsHash(); - const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; - IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; - if (!DataBuffer) + if (!m_CidStore.ContainsChunk(Cid)) { Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); AnyErrors = true; @@ -1050,11 +1146,9 @@ namespace detail { return; } - const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); - InputFiles.insert(FilePath); - InputFileHashes[FilePath] = CompressedId; - Data.Blobs[CompressedId] = std::move(DataBuffer); + InputFileHashes[FilePath] = Cid; + Data.Cids.insert(Cid); }); if (AnyErrors) @@ -1067,7 +1161,7 @@ namespace detail { const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); - CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); + CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Cids, Data.Objects); const IoHash SandboxHash = Sandbox.GetHash(); Data.Objects[SandboxHash] = std::move(Sandbox); @@ -1118,28 +1212,18 @@ namespace detail { [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, std::set<std::filesystem::path>& InputFiles, std::map<std::filesystem::path, IoHash>& InputFileHashes, - std::map<IoHash, IoBuffer>& Blobs) + std::set<IoHash>& CasIds) { - const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); - const IoHash ChunkId = FileEntry["hash"sv].AsHash(); - const uint64_t Size = FileEntry["size"sv].AsUInt64(); - IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); + const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); + const IoHash ChunkId = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); - if (!DataBuffer) + if (!m_CasStore.ContainsChunk(ChunkId)) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); return false; } - if (DataBuffer.Size() != Size) - { - Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", - ChunkId, - DataBuffer.Size(), - Size); - return false; - } - if (InputFiles.contains(FilePath)) { Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); @@ -1148,7 +1232,7 @@ namespace detail { InputFiles.insert(FilePath); InputFileHashes[FilePath] = ChunkId; - Blobs[ChunkId] = std::move(DataBuffer); + CasIds.insert(ChunkId); return true; } @@ -1204,7 +1288,7 @@ namespace detail { [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, const std::map<std::filesystem::path, IoHash>& InputFileHashes, - const std::map<IoHash, IoBuffer>& Blobs, + const std::set<IoHash>& Cids, std::map<IoHash, CbObject>& Objects) { CbObjectWriter DirectoryTreeWriter; @@ -1214,14 +1298,13 @@ namespace detail { DirectoryTreeWriter.BeginArray("f"sv); for (const auto& File : RootDirectory.Files) { - const std::filesystem::path FilePath = {RootDirectory.Path / File}; - const IoHash& FileHash = InputFileHashes.at(FilePath); - const uint64_t FileSize = Blobs.at(FileHash).Size(); + const std::filesystem::path FilePath = {RootDirectory.Path / File}; + const IoHash& FileHash = InputFileHashes.at(FilePath); + const bool Compressed = Cids.contains(FileHash); DirectoryTreeWriter.BeginObject(); DirectoryTreeWriter.AddString("n"sv, File); DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); - DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size - // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded + DirectoryTreeWriter.AddBool("c"sv, Compressed); DirectoryTreeWriter.EndObject(); } DirectoryTreeWriter.EndArray(); @@ -1232,7 +1315,7 @@ namespace detail { DirectoryTreeWriter.BeginArray("d"sv); for (const auto& Item : RootDirectory.Directories) { - CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); + CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Cids, Objects); const IoHash DirectoryHash = Directory.GetHash(); Objects[DirectoryHash] = std::move(Directory); diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index efc75b5b4..0237ec346 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -408,14 +408,15 @@ ZenStructuredCacheSession::CheckHealth() } ZenCacheResult -ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type) +ZenStructuredCacheSession::GetCacheRecord(std::string_view, std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/"; - if (Namespace != ZenCacheStore::DefaultNamespace) - { - Uri << Namespace << "/"; - } + // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support + // if (Namespace != ZenCacheStore::DefaultNamespace) + // { + // Uri << Namespace << "/"; + // } Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -437,17 +438,15 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::strin } ZenCacheResult -ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - const IoHash& ValueContentId) +ZenStructuredCacheSession::GetCacheValue(std::string_view, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId) { ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/"; - if (Namespace != ZenCacheStore::DefaultNamespace) - { - Uri << Namespace << "/"; - } + // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support + // if (Namespace != ZenCacheStore::DefaultNamespace) + // { + // Uri << Namespace << "/"; + // } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -466,11 +465,15 @@ ZenStructuredCacheSession::GetCacheValue(std::string_view Namespace, const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = Buffer, + .Bytes = Response.downloaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Reason = Response.reason, + .Success = Success}; } ZenCacheResult -ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, +ZenStructuredCacheSession::PutCacheRecord(std::string_view, std::string_view BucketId, const IoHash& Key, IoBuffer Value, @@ -478,10 +481,11 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, { ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/"; - if (Namespace != ZenCacheStore::DefaultNamespace) - { - Uri << Namespace << "/"; - } + // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support + // if (Namespace != ZenCacheStore::DefaultNamespace) + // { + // Uri << Namespace << "/"; + // } Uri << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -501,13 +505,12 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace, return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; } - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult -ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, +ZenStructuredCacheSession::PutCacheValue(std::string_view, std::string_view BucketId, const IoHash& Key, const IoHash& ValueContentId, @@ -515,10 +518,11 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, { ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/"; - if (Namespace != ZenCacheStore::DefaultNamespace) - { - Uri << Namespace << "/"; - } + // TODO: DE20220530: Disable adding namespace into URL until we have updated the shared instances with namespace support + // if (Namespace != ZenCacheStore::DefaultNamespace) + // { + // Uri << Namespace << "/"; + // } Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString(); cpr::Session& Session = m_SessionState->GetSession(); @@ -535,9 +539,8 @@ ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace, return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; } - return {.Bytes = Response.uploaded_bytes, - .ElapsedSeconds = Response.elapsed, - .Success = (Response.status_code == 200 || Response.status_code == 201)}; + const bool Success = Response.status_code == 200 || Response.status_code == 201; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success}; } ZenCacheResult @@ -566,7 +569,11 @@ ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = std::move(Buffer), + .Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Reason = Response.reason, + .Success = Success}; } ZenCacheResult @@ -594,7 +601,11 @@ ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request) const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; + return {.Response = std::move(Buffer), + .Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Reason = Response.reason, + .Success = Success}; } } // namespace zen diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 53a046f80..4db69c265 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -364,6 +364,7 @@ public: ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); zen::GcSchedulerConfig GcConfig{ .RootDirectory = m_DataRoot / "gc", + .MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds), .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, @@ -754,7 +755,9 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) using namespace std::literals; ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache"); + m_CacheStore = std::make_unique<ZenCacheStore>( + m_CasGc, + ZenCacheStore::Configuration{.BasePath = m_DataRoot / "cache", .AllowAutomaticCreationOfNamespaces = true}); const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; @@ -1161,7 +1164,7 @@ test_main(int argc, char** argv) zen::MaximizeOpenFileCount(); - return ZEN_RUN_TESTS(argc, argv); + return ZEN_RUN_TESTS(argc, argv); } #endif diff --git a/zenstore-test/zenstore-test.cpp b/zenstore-test/zenstore-test.cpp index 8b3d6c648..00c1136b6 100644 --- a/zenstore-test/zenstore-test.cpp +++ b/zenstore-test/zenstore-test.cpp @@ -25,7 +25,7 @@ main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) zen::logging::InitializeLogging(); zen::MaximizeOpenFileCount(); - return ZEN_RUN_TESTS(argc, argv); + return ZEN_RUN_TESTS(argc, argv); #else return 0; #endif diff --git a/zenstore/blockstore.cpp b/zenstore/blockstore.cpp index d490678b5..4e61c23cf 100644 --- a/zenstore/blockstore.cpp +++ b/zenstore/blockstore.cpp @@ -225,23 +225,27 @@ BlockStore::WriteChunk(const void* Data, uint64_t Size, uint64_t Alignment, Writ { m_WriteBlock = nullptr; } + + if (m_ChunkBlocks.size() == m_MaxBlockCount) { - if (m_ChunkBlocks.size() == m_MaxBlockCount) - { - throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); - } - WriteBlockIndex += IsWriting ? 1 : 0; - while (m_ChunkBlocks.contains(WriteBlockIndex)) - { - WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); - } - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); - m_WriteBlock = new BlockStoreFile(BlockPath); - m_ChunkBlocks[WriteBlockIndex] = m_WriteBlock; - m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); + throw std::runtime_error(fmt::format("unable to allocate a new block in '{}'", m_BlocksBasePath)); } + + WriteBlockIndex += IsWriting ? 1 : 0; + while (m_ChunkBlocks.contains(WriteBlockIndex)) + { + WriteBlockIndex = (WriteBlockIndex + 1) & (m_MaxBlockCount - 1); + } + + std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, WriteBlockIndex); + + Ref<BlockStoreFile> NewBlockFile = new BlockStoreFile(BlockPath); + NewBlockFile->Create(m_MaxBlockSize); + + m_ChunkBlocks[WriteBlockIndex] = NewBlockFile; + m_WriteBlock = NewBlockFile; + m_WriteBlockIndex.store(WriteBlockIndex, std::memory_order_release); m_CurrentInsertOffset = 0; - m_WriteBlock->Create(m_MaxBlockSize); } uint64_t InsertOffset = m_CurrentInsertOffset; m_CurrentInsertOffset = RoundUp(InsertOffset + Size, Alignment); @@ -268,6 +272,10 @@ BlockStore::GetReclaimSnapshotState() { State.m_ActiveWriteBlocks.insert(BlockIndex); } + if (m_WriteBlock) + { + State.m_ActiveWriteBlocks.insert(m_WriteBlockIndex); + } State.BlockCount = m_ChunkBlocks.size(); return State; } @@ -911,12 +919,6 @@ BlockStore::GetBlockPath(const std::filesystem::path& BlocksBasePath, const uint #if ZEN_WITH_TESTS -static bool -operator==(const BlockStoreLocation& Lhs, const BlockStoreLocation& Rhs) -{ - return Lhs.BlockIndex == Rhs.BlockIndex && Lhs.Offset == Rhs.Offset && Lhs.Size == Rhs.Size; -} - TEST_CASE("blockstore.blockstoredisklocation") { BlockStoreLocation Zero = BlockStoreLocation{.BlockIndex = 0, .Offset = 0, .Size = 0}; diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index c277359bd..65f959a0e 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -1894,8 +1894,12 @@ TEST_CASE("compactcas.threadedinsert") } } -TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true)) +TEST_CASE("compactcas.migrate.large.data") // * doctest::skip(true)) { + if (true) + { + return; + } const char* BigDataPath = "D:\\zen-data\\dc4-zen-cache-t\\cas"; std::filesystem::path TobsBasePath = GetBasePath(BigDataPath, "tobs"); std::filesystem::path SobsBasePath = GetBasePath(BigDataPath, "sobs"); diff --git a/zenstore/gc.cpp b/zenstore/gc.cpp index 4b50668d9..8e2d441f8 100644 --- a/zenstore/gc.cpp +++ b/zenstore/gc.cpp @@ -343,7 +343,7 @@ GcStorage::GcStorage(CasGc& Gc) : m_Gc(Gc) GcStorage::~GcStorage() { - m_Gc.AddGcStorage(this); + m_Gc.RemoveGcStorage(this); } ////////////////////////////////////////////////////////////////////////// @@ -373,6 +373,7 @@ CasGc::RemoveGcContributor(GcContributor* Contributor) void CasGc::AddGcStorage(GcStorage* Storage) { + ZEN_ASSERT(Storage != nullptr); RwLock::ExclusiveLockScope _(m_Lock); m_GcStorage.push_back(Storage); } diff --git a/zenstore/include/zenstore/blockstore.h b/zenstore/include/zenstore/blockstore.h index 34c475fb6..000395fb9 100644 --- a/zenstore/include/zenstore/blockstore.h +++ b/zenstore/include/zenstore/blockstore.h @@ -64,6 +64,8 @@ struct BlockStoreDiskLocation inline uint64_t GetSize() const { return m_Size; } + inline auto operator<=>(const BlockStoreDiskLocation& Rhs) const = default; + private: inline void Init(uint32_t BlockIndex, uint64_t Offset, uint64_t Size) { |