diff options
| author | Dan Engelbrecht <[email protected]> | 2022-11-07 10:18:44 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-11-07 01:18:44 -0800 |
| commit | bb07e58a9c59705b54164b06bcbe40c052880d90 (patch) | |
| tree | 2b691d876b232f5354459a9c4a5cb44c21d541a8 | |
| parent | 0.1.8 (diff) | |
| download | zen-bb07e58a9c59705b54164b06bcbe40c052880d90.tar.xz zen-bb07e58a9c59705b54164b06bcbe40c052880d90.zip | |
Support file reference in package message (#184)
* Fix packed message parsing for absolute path
* Always enable are sharing when opening files as IoBuffers.
* Allow control over sending partial files as localfile ref
* Check "AcceptFlags" field in RPC message for allowing localfile ref in reply
* make oplog entry add operations ZEN_DEBUG level logs
* changelog
| -rw-r--r-- | CHANGELOG.md | 3 | ||||
| -rw-r--r-- | zencore/include/zencore/iobuffer.h | 1 | ||||
| -rw-r--r-- | zencore/iobuffer.cpp | 36 | ||||
| -rw-r--r-- | zenhttp/httpshared.cpp | 25 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpshared.h | 13 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 34 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 2 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 2 | ||||
| -rw-r--r-- | zenserver/projectstore.cpp | 12 |
9 files changed, 76 insertions, 52 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 79938caf7..7404dd162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## +- Improvement: Send attachments as file references if the IoBuffer we find represents a complete file and `AcceptFlags` in RPC request allows it. + +## v0.1.8 - Change: Responding with new wire format for RPC requests requires the requestor to add a `Accept` field in the request. This is to allow compatability with older clients for shared instances. - Improvement: Fixed concurrency issues in project store - project and oplog lifetime issues. - Improvement: Don't open oplogs until we require use of them. diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index b38201ba3..7f107cc0f 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -409,7 +409,6 @@ class IoBufferBuilder { public: ZENCORE_API static IoBuffer MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset = 0, uint64_t Size = ~0ull); - ZENCORE_API static IoBuffer MakeFromFileWithSharedDelete(const std::filesystem::path& FileName); ZENCORE_API static IoBuffer MakeFromTemporaryFile(const std::filesystem::path& FileName); ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull); ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer); diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index 177b7dfb6..16dd22a58 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -480,20 +480,16 @@ IoBufferBuilder::MakeFromFileHandle(void* FileHandle, uint64_t Offset, uint64_t return IoBuffer(IoBuffer::BorrowedFile, FileHandle, Offset, Size); } -static IoBuffer -MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size, bool UseShareDelete) +IoBuffer +IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size) { uint64_t FileSize; #if ZEN_PLATFORM_WINDOWS CAtlFile DataFile; - DWORD ShareOptions = FILE_SHARE_READ; - if (UseShareDelete) - { - ShareOptions |= FILE_SHARE_DELETE; - } - HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, FILE_SHARE_READ | ShareOptions, OPEN_EXISTING); + DWORD ShareOptions = FILE_SHARE_DELETE | FILE_SHARE_WRITE | FILE_SHARE_DELETE | FILE_SHARE_READ; + HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, ShareOptions, OPEN_EXISTING); if (FAILED(hRes)) { @@ -502,12 +498,8 @@ MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset, DataFile.GetSize((ULONGLONG&)FileSize); #else - int Flags = O_RDONLY; - if (UseShareDelete) - { - Flags |= O_CLOEXEC; - } - int Fd = open(FileName.c_str(), Flags); + int Flags = O_RDONLY | O_CLOEXEC; + int Fd = open(FileName.c_str(), Flags); if (Fd < 0) { return {}; @@ -539,7 +531,9 @@ MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset, #if ZEN_PLATFORM_WINDOWS void* Fd = DataFile.Detach(); #endif - return IoBuffer(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size); + IoBuffer Iob(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size); + Iob.m_Core->SetIsWholeFile(Offset == 0 && Size == FileSize); + return Iob; } #if !ZEN_PLATFORM_WINDOWS @@ -551,18 +545,6 @@ MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset, } IoBuffer -IoBufferBuilder::MakeFromFileWithSharedDelete(const std::filesystem::path& FileName) -{ - return MakeFromFileWithOptions(FileName, 0, ~0ull, true); -} - -IoBuffer -IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size) -{ - return MakeFromFileWithOptions(FileName, Offset, Size, false); -} - -IoBuffer IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) { uint64_t FileSize; diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index f9aa3af82..a7dca5441 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -6,14 +6,14 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> +#include <zencore/logging.h> #include <zencore/stream.h> #include <zencore/testing.h> #include <zencore/testutils.h> -#include <zencore/fmtutils.h> - #include <span> #include <vector> @@ -93,7 +93,10 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) ResponseBuffers.push_back(std::move(RefBuffer)); }; - auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool { + auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, + bool DenyPartialLocalReferences, + CbAttachmentReferenceHeader& LocalRef, + std::string& Path8) -> bool { const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); IoBufferFileReference Ref; const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); @@ -103,6 +106,11 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) return false; } + if (DenyPartialLocalReferences && !SegmentBuffer.IsWholeFile()) + { + return false; + } + ExtendablePathBuilder<256> LocalRefFile; LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); Path8 = LocalRefFile.ToUtf8(); @@ -131,19 +139,20 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) // segments to be marshaled at once bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1); - + bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences); CbAttachmentReferenceHeader LocalRef; std::string Path8; if (MarshalByLocalRef) { - MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8); + MarshalByLocalRef = IsLocalRef(Compressed, DenyPartialLocalReferences, LocalRef, Path8); } if (MarshalByLocalRef) { const bool IsCompressed = true; MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, Compressed.GetSize()); } else { @@ -171,19 +180,21 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); + bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences); CbAttachmentReferenceHeader LocalRef; std::string Path8; if (MarshalByLocalRef) { - MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8); + MarshalByLocalRef = IsLocalRef(AttachmentBinary, DenyPartialLocalReferences, LocalRef, Path8); } if (MarshalByLocalRef) { const bool IsCompressed = false; MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, AttachmentBinary.GetSize()); } else { @@ -270,7 +281,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); - std::filesystem::path Path{PathPointer}; + std::filesystem::path Path{std::u8string_view(PathPointer, AttachRefHdr->AbsolutePathLength)}; if (IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)) diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h index 0265d8d1e..7ab9c9339 100644 --- a/zenhttp/include/zenhttp/httpshared.h +++ b/zenhttp/include/zenhttp/httpshared.h @@ -78,12 +78,21 @@ static_assert(sizeof(CbAttachmentEntry) == 32); enum class FormatFlags { - kDefault = 0, - kAllowLocalReferences = (1u << 0) + kDefault = 0, + kAllowLocalReferences = (1u << 0), + kDenyPartialLocalReferences = (1u << 1) }; gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags); +enum class RpcAcceptOptions : uint16_t +{ + kNone = 0, + kAllowLocalReferences = (1u << 0), +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions); + std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags); CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags); CbPackage ParsePackageMessage( diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 0e2462a4a..c5beef4b3 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1214,7 +1214,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req if (AcceptMagic == kCbPkgMagic) { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else @@ -1316,7 +1316,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u)); + bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); @@ -1649,7 +1651,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt if (AcceptMagic == kCbPkgMagic) { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( + ResponsePackage, + AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else @@ -1771,7 +1775,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ if (AcceptMagic == kCbPkgMagic) { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else @@ -1792,7 +1796,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); - uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u)); + bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); @@ -1977,7 +1983,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http if (AcceptMagic == kCbPkgMagic) { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( + RpcResponse, + AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else @@ -2043,9 +2051,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream uint32_t AcceptMagic = 0; + uint16_t AcceptFlags = 0; // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests if (!ParseGetCacheChunksRequest(AcceptMagic, + AcceptFlags, Namespace, RecordKeys, Records, @@ -2069,11 +2079,12 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client - WriteGetCacheChunksResponse(AcceptMagic, Namespace, Requests, HttpRequest); + WriteGetCacheChunksResponse(AcceptMagic, AcceptFlags, Namespace, Requests, HttpRequest); } bool HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& AcceptMagic, + uint16_t& AcceptFlags, std::string& Namespace, std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, @@ -2088,6 +2099,7 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& Accept ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); AcceptMagic = RpcRequest["Accept"sv].AsUInt32(); + AcceptFlags = RpcRequest["AcceptFlags"sv].AsUInt16(0u); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); @@ -2440,12 +2452,16 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names void HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t AcceptMagic, + uint16_t AcceptFlags, std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest) { using namespace cache::detail; + RpcAcceptOptions AcceptOptions = static_cast<RpcAcceptOptions>(AcceptFlags); + bool AllowFileReferences = EnumHasAllFlags(AcceptOptions, RpcAcceptOptions::kAllowLocalReferences); + CbPackage RpcResponse; CbObjectWriter Writer; @@ -2505,7 +2521,9 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept if (AcceptMagic == kCbPkgMagic) { - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( + RpcResponse, + AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } else diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index bda838dbc..a74d6b7a6 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -118,6 +118,7 @@ private: /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ bool ParseGetCacheChunksRequest(uint32_t& AcceptMagic, + uint16_t& AcceptFlags, std::string& Namespace, std::vector<CacheKeyRequest>& RecordKeys, std::vector<cache::detail::RecordBody>& Records, @@ -143,6 +144,7 @@ private: std::vector<cache::detail::ChunkRequest>& Requests); /** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */ void WriteGetCacheChunksResponse(uint32_t AcceptMagic, + uint16_t AcceptFlags, std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests, zen::HttpServerRequest& HttpRequest); diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index e25759192..1fab3b312 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -904,7 +904,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); - if (IoBuffer Data = IoBufferBuilder::MakeFromFileWithSharedDelete(DataFilePath.ToPath())) + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath())) { OutValue.Value = Data; OutValue.Value.SetContentType(Loc.GetContentType()); diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index d953651af..ef235557c 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -565,11 +565,11 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry } } - ZEN_INFO("added {} file(s), {} as files and {} as chunks in {}", - FileCount + ChunkCount, - FileCount, - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + ZEN_DEBUG("added {} file(s), {} as files and {} as chunks in {}", + FileCount + ChunkCount, + FileCount, + ChunkCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } for (CbFieldView& Entry : Core["meta"sv]) @@ -1842,7 +1842,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } - ZEN_INFO("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); + ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); HttpReq.WriteResponse(HttpResponseCode::Created); }, |