aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-11-07 10:18:44 +0100
committerGitHub <[email protected]>2022-11-07 01:18:44 -0800
commitbb07e58a9c59705b54164b06bcbe40c052880d90 (patch)
tree2b691d876b232f5354459a9c4a5cb44c21d541a8
parent0.1.8 (diff)
downloadzen-bb07e58a9c59705b54164b06bcbe40c052880d90.tar.xz
zen-bb07e58a9c59705b54164b06bcbe40c052880d90.zip
Support file reference in package message (#184)
* Fix packed message parsing for absolute path * Always enable are sharing when opening files as IoBuffers. * Allow control over sending partial files as localfile ref * Check "AcceptFlags" field in RPC message for allowing localfile ref in reply * make oplog entry add operations ZEN_DEBUG level logs * changelog
-rw-r--r--CHANGELOG.md3
-rw-r--r--zencore/include/zencore/iobuffer.h1
-rw-r--r--zencore/iobuffer.cpp36
-rw-r--r--zenhttp/httpshared.cpp25
-rw-r--r--zenhttp/include/zenhttp/httpshared.h13
-rw-r--r--zenserver/cache/structuredcache.cpp34
-rw-r--r--zenserver/cache/structuredcache.h2
-rw-r--r--zenserver/cache/structuredcachestore.cpp2
-rw-r--r--zenserver/projectstore.cpp12
9 files changed, 76 insertions, 52 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 79938caf7..7404dd162 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
##
+- Improvement: Send attachments as file references if the IoBuffer we find represents a complete file and `AcceptFlags` in RPC request allows it.
+
+## v0.1.8
- Change: Responding with new wire format for RPC requests requires the requestor to add a `Accept` field in the request. This is to allow compatability with older clients for shared instances.
- Improvement: Fixed concurrency issues in project store - project and oplog lifetime issues.
- Improvement: Don't open oplogs until we require use of them.
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index b38201ba3..7f107cc0f 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -409,7 +409,6 @@ class IoBufferBuilder
{
public:
ZENCORE_API static IoBuffer MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset = 0, uint64_t Size = ~0ull);
- ZENCORE_API static IoBuffer MakeFromFileWithSharedDelete(const std::filesystem::path& FileName);
ZENCORE_API static IoBuffer MakeFromTemporaryFile(const std::filesystem::path& FileName);
ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull);
ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer);
diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp
index 177b7dfb6..16dd22a58 100644
--- a/zencore/iobuffer.cpp
+++ b/zencore/iobuffer.cpp
@@ -480,20 +480,16 @@ IoBufferBuilder::MakeFromFileHandle(void* FileHandle, uint64_t Offset, uint64_t
return IoBuffer(IoBuffer::BorrowedFile, FileHandle, Offset, Size);
}
-static IoBuffer
-MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size, bool UseShareDelete)
+IoBuffer
+IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size)
{
uint64_t FileSize;
#if ZEN_PLATFORM_WINDOWS
CAtlFile DataFile;
- DWORD ShareOptions = FILE_SHARE_READ;
- if (UseShareDelete)
- {
- ShareOptions |= FILE_SHARE_DELETE;
- }
- HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, FILE_SHARE_READ | ShareOptions, OPEN_EXISTING);
+ DWORD ShareOptions = FILE_SHARE_DELETE | FILE_SHARE_WRITE | FILE_SHARE_DELETE | FILE_SHARE_READ;
+ HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, ShareOptions, OPEN_EXISTING);
if (FAILED(hRes))
{
@@ -502,12 +498,8 @@ MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset,
DataFile.GetSize((ULONGLONG&)FileSize);
#else
- int Flags = O_RDONLY;
- if (UseShareDelete)
- {
- Flags |= O_CLOEXEC;
- }
- int Fd = open(FileName.c_str(), Flags);
+ int Flags = O_RDONLY | O_CLOEXEC;
+ int Fd = open(FileName.c_str(), Flags);
if (Fd < 0)
{
return {};
@@ -539,7 +531,9 @@ MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset,
#if ZEN_PLATFORM_WINDOWS
void* Fd = DataFile.Detach();
#endif
- return IoBuffer(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size);
+ IoBuffer Iob(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size);
+ Iob.m_Core->SetIsWholeFile(Offset == 0 && Size == FileSize);
+ return Iob;
}
#if !ZEN_PLATFORM_WINDOWS
@@ -551,18 +545,6 @@ MakeFromFileWithOptions(const std::filesystem::path& FileName, uint64_t Offset,
}
IoBuffer
-IoBufferBuilder::MakeFromFileWithSharedDelete(const std::filesystem::path& FileName)
-{
- return MakeFromFileWithOptions(FileName, 0, ~0ull, true);
-}
-
-IoBuffer
-IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Offset, uint64_t Size)
-{
- return MakeFromFileWithOptions(FileName, Offset, Size, false);
-}
-
-IoBuffer
IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName)
{
uint64_t FileSize;
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index f9aa3af82..a7dca5441 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -6,14 +6,14 @@
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
+#include <zencore/logging.h>
#include <zencore/stream.h>
#include <zencore/testing.h>
#include <zencore/testutils.h>
-#include <zencore/fmtutils.h>
-
#include <span>
#include <vector>
@@ -93,7 +93,10 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
ResponseBuffers.push_back(std::move(RefBuffer));
};
- auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool {
+ auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary,
+ bool DenyPartialLocalReferences,
+ CbAttachmentReferenceHeader& LocalRef,
+ std::string& Path8) -> bool {
const SharedBuffer& Segment = AttachmentBinary.GetSegments().front();
IoBufferFileReference Ref;
const IoBuffer& SegmentBuffer = Segment.AsIoBuffer();
@@ -103,6 +106,11 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
return false;
}
+ if (DenyPartialLocalReferences && !SegmentBuffer.IsWholeFile())
+ {
+ return false;
+ }
+
ExtendablePathBuilder<256> LocalRefFile;
LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
Path8 = LocalRefFile.ToUtf8();
@@ -131,19 +139,20 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
// segments to be marshaled at once
bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1);
-
+ bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences);
CbAttachmentReferenceHeader LocalRef;
std::string Path8;
if (MarshalByLocalRef)
{
- MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8);
+ MarshalByLocalRef = IsLocalRef(Compressed, DenyPartialLocalReferences, LocalRef, Path8);
}
if (MarshalByLocalRef)
{
const bool IsCompressed = true;
MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, Compressed.GetSize());
}
else
{
@@ -171,19 +180,21 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
bool MarshalByLocalRef =
EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1);
+ bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences);
CbAttachmentReferenceHeader LocalRef;
std::string Path8;
if (MarshalByLocalRef)
{
- MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8);
+ MarshalByLocalRef = IsLocalRef(AttachmentBinary, DenyPartialLocalReferences, LocalRef, Path8);
}
if (MarshalByLocalRef)
{
const bool IsCompressed = false;
MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, AttachmentBinary.GetSize());
}
else
{
@@ -270,7 +281,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
- std::filesystem::path Path{PathPointer};
+ std::filesystem::path Path{std::u8string_view(PathPointer, AttachRefHdr->AbsolutePathLength)};
if (IoBuffer ChunkReference =
IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize))
diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h
index 0265d8d1e..7ab9c9339 100644
--- a/zenhttp/include/zenhttp/httpshared.h
+++ b/zenhttp/include/zenhttp/httpshared.h
@@ -78,12 +78,21 @@ static_assert(sizeof(CbAttachmentEntry) == 32);
enum class FormatFlags
{
- kDefault = 0,
- kAllowLocalReferences = (1u << 0)
+ kDefault = 0,
+ kAllowLocalReferences = (1u << 0),
+ kDenyPartialLocalReferences = (1u << 1)
};
gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags);
+enum class RpcAcceptOptions : uint16_t
+{
+ kNone = 0,
+ kAllowLocalReferences = (1u << 0),
+};
+
+gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions);
+
std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags);
CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags);
CbPackage ParsePackageMessage(
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 0e2462a4a..c5beef4b3 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1214,7 +1214,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
if (AcceptMagic == kCbPkgMagic)
{
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault);
Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
@@ -1316,7 +1316,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+ uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u));
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
@@ -1649,7 +1651,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
if (AcceptMagic == kCbPkgMagic)
{
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ ResponsePackage,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
@@ -1771,7 +1775,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
if (AcceptMagic == kCbPkgMagic)
{
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault);
Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
@@ -1792,7 +1796,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
- uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+ uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+ RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u));
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
@@ -1977,7 +1983,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
if (AcceptMagic == kCbPkgMagic)
{
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResponse,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
@@ -2043,9 +2051,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
uint32_t AcceptMagic = 0;
+ uint16_t AcceptFlags = 0;
// Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
if (!ParseGetCacheChunksRequest(AcceptMagic,
+ AcceptFlags,
Namespace,
RecordKeys,
Records,
@@ -2069,11 +2079,12 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- WriteGetCacheChunksResponse(AcceptMagic, Namespace, Requests, HttpRequest);
+ WriteGetCacheChunksResponse(AcceptMagic, AcceptFlags, Namespace, Requests, HttpRequest);
}
bool
HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& AcceptMagic,
+ uint16_t& AcceptFlags,
std::string& Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
@@ -2088,6 +2099,7 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& Accept
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
+ AcceptFlags = RpcRequest["AcceptFlags"sv].AsUInt16(0u);
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
@@ -2440,12 +2452,16 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
void
HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t AcceptMagic,
+ uint16_t AcceptFlags,
std::string_view Namespace,
std::vector<cache::detail::ChunkRequest>& Requests,
zen::HttpServerRequest& HttpRequest)
{
using namespace cache::detail;
+ RpcAcceptOptions AcceptOptions = static_cast<RpcAcceptOptions>(AcceptFlags);
+ bool AllowFileReferences = EnumHasAllFlags(AcceptOptions, RpcAcceptOptions::kAllowLocalReferences);
+
CbPackage RpcResponse;
CbObjectWriter Writer;
@@ -2505,7 +2521,9 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept
if (AcceptMagic == kCbPkgMagic)
{
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResponse,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index bda838dbc..a74d6b7a6 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -118,6 +118,7 @@ private:
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
bool ParseGetCacheChunksRequest(uint32_t& AcceptMagic,
+ uint16_t& AcceptFlags,
std::string& Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
@@ -143,6 +144,7 @@ private:
std::vector<cache::detail::ChunkRequest>& Requests);
/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
void WriteGetCacheChunksResponse(uint32_t AcceptMagic,
+ uint16_t AcceptFlags,
std::string_view Namespace,
std::vector<cache::detail::ChunkRequest>& Requests,
zen::HttpServerRequest& HttpRequest);
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index e25759192..1fab3b312 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -904,7 +904,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
RwLock::SharedLockScope ValueLock(LockForHash(HashKey));
- if (IoBuffer Data = IoBufferBuilder::MakeFromFileWithSharedDelete(DataFilePath.ToPath()))
+ if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.ToPath()))
{
OutValue.Value = Data;
OutValue.Value.SetContentType(Loc.GetContentType());
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index d953651af..ef235557c 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -565,11 +565,11 @@ ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry
}
}
- ZEN_INFO("added {} file(s), {} as files and {} as chunks in {}",
- FileCount + ChunkCount,
- FileCount,
- ChunkCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_DEBUG("added {} file(s), {} as files and {} as chunks in {}",
+ FileCount + ChunkCount,
+ FileCount,
+ ChunkCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
for (CbFieldView& Entry : Core["meta"sv])
@@ -1842,7 +1842,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
}
- ZEN_INFO("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString());
+ ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString());
HttpReq.WriteResponse(HttpResponseCode::Created);
},