blob: 56987763918226c541fc542f30f6e8925218892e [file] [log] [blame]
// Licensed to the Apache Software Foundation(ASF) under one
// or more contributor license agreements.See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using Microsoft.Win32.SafeHandles;
using System;
using System.IO.Pipes;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.ComponentModel;
using System.Security.AccessControl;
using System.Security.Principal;
namespace Thrift.Transport.Server
{
// ReSharper disable once InconsistentNaming
public class TNamedPipeServerTransport : TServerTransport
{
/// <summary>
/// This is the address of the Pipe on the localhost.
/// </summary>
private readonly string _pipeAddress;
private bool _asyncMode = true;
private volatile bool _isPending = true;
private NamedPipeServerStream _stream = null;
public TNamedPipeServerTransport(string pipeAddress, TConfiguration config)
: base(config)
{
_pipeAddress = pipeAddress;
}
public override bool IsOpen() {
return true;
}
public override void Listen()
{
// nothing to do here
}
public override void Close()
{
if (_stream != null)
{
try
{
if (_stream.IsConnected)
_stream.Disconnect();
_stream.Dispose();
}
finally
{
_stream = null;
_isPending = false;
}
}
}
public override bool IsClientPending()
{
return _isPending;
}
private void EnsurePipeInstance()
{
if (_stream == null)
{
const PipeDirection direction = PipeDirection.InOut;
const int maxconn = NamedPipeServerStream.MaxAllowedServerInstances;
const PipeTransmissionMode mode = PipeTransmissionMode.Byte;
const int inbuf = 4096;
const int outbuf = 4096;
var options = _asyncMode ? PipeOptions.Asynchronous : PipeOptions.None;
// TODO: "CreatePipeNative" ist only a workaround, and there are have basically two possible outcomes:
// - once NamedPipeServerStream() gets a CTOR that supports pipesec, remove CreatePipeNative()
// - if 31190 gets resolved before, use _stream.SetAccessControl(pipesec) instead of CreatePipeNative()
// EITHER WAY,
// - if CreatePipeNative() finally gets removed, also remove "allow unsafe code" from the project settings
try
{
var handle = CreatePipeNative(_pipeAddress, inbuf, outbuf);
if ((handle != null) && (!handle.IsInvalid))
{
_stream = new NamedPipeServerStream(PipeDirection.InOut, _asyncMode, false, handle);
handle = null; // we don't own it any longer
}
else
{
handle?.Dispose();
_stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf/*, pipesec*/);
}
}
catch (NotImplementedException) // Mono still does not support async, fallback to sync
{
if (_asyncMode)
{
options &= (~PipeOptions.Asynchronous);
_stream = new NamedPipeServerStream(_pipeAddress, direction, maxconn, mode, options, inbuf, outbuf);
_asyncMode = false;
}
else
{
throw;
}
}
}
}
#region CreatePipeNative workaround
[StructLayout(LayoutKind.Sequential)]
internal class SECURITY_ATTRIBUTES
{
internal int nLength = 0;
internal IntPtr lpSecurityDescriptor = IntPtr.Zero;
internal int bInheritHandle = 0;
}
private const string Kernel32 = "kernel32.dll";
[DllImport(Kernel32, SetLastError = true, CharSet = CharSet.Unicode)]
internal static extern IntPtr CreateNamedPipe(
string lpName, uint dwOpenMode, uint dwPipeMode,
uint nMaxInstances, uint nOutBufferSize, uint nInBufferSize, uint nDefaultTimeOut,
SECURITY_ATTRIBUTES pipeSecurityDescriptor
);
// Workaround: create the pipe via API call
// we have to do it this way, since NamedPipeServerStream() for netstd still lacks a few CTORs
// and _stream.SetAccessControl(pipesec); only keeps throwing ACCESS_DENIED errors at us
// References:
// - https://github.com/dotnet/corefx/issues/30170 (closed, continued in 31190)
// - https://github.com/dotnet/corefx/issues/31190 System.IO.Pipes.AccessControl package does not work
// - https://github.com/dotnet/corefx/issues/24040 NamedPipeServerStream: Provide support for WRITE_DAC
// - https://github.com/dotnet/corefx/issues/34400 Have a mechanism for lower privileged user to connect to a privileged user's pipe
private static SafePipeHandle CreatePipeNative(string name, int inbuf, int outbuf)
{
if (! RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
return null; // Windows only
var pinningHandle = new GCHandle();
try
{
// owner gets full access, everyone else read/write
var pipesec = new PipeSecurity();
using (var currentIdentity = WindowsIdentity.GetCurrent())
{
var sidOwner = currentIdentity.Owner;
var sidWorld = new SecurityIdentifier(WellKnownSidType.WorldSid, null);
pipesec.SetOwner(sidOwner);
pipesec.AddAccessRule(new PipeAccessRule(sidOwner, PipeAccessRights.FullControl, AccessControlType.Allow));
pipesec.AddAccessRule(new PipeAccessRule(sidWorld, PipeAccessRights.ReadWrite, AccessControlType.Allow));
}
// create a security descriptor and assign it to the security attribs
var secAttrs = new SECURITY_ATTRIBUTES();
byte[] sdBytes = pipesec.GetSecurityDescriptorBinaryForm();
pinningHandle = GCHandle.Alloc(sdBytes, GCHandleType.Pinned);
unsafe {
fixed (byte* pSD = sdBytes) {
secAttrs.lpSecurityDescriptor = (IntPtr)pSD;
}
}
// a bunch of constants we will need shortly
const int PIPE_ACCESS_DUPLEX = 0x00000003;
const int FILE_FLAG_OVERLAPPED = 0x40000000;
const int WRITE_DAC = 0x00040000;
const int PIPE_TYPE_BYTE = 0x00000000;
const int PIPE_READMODE_BYTE = 0x00000000;
const int PIPE_UNLIMITED_INSTANCES = 255;
// create the pipe via API call
var rawHandle = CreateNamedPipe(
@"\\.\pipe\" + name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
PIPE_UNLIMITED_INSTANCES, (uint)inbuf, (uint)outbuf,
5 * 1000,
secAttrs
);
// make a SafePipeHandle() from it
var handle = new SafePipeHandle(rawHandle, true);
if (handle.IsInvalid)
throw new Win32Exception(Marshal.GetLastWin32Error());
// return it (to be packaged)
return handle;
}
finally
{
if (pinningHandle.IsAllocated)
pinningHandle.Free();
}
}
#endregion
protected override async ValueTask<TTransport> AcceptImplementationAsync(CancellationToken cancellationToken)
{
try
{
EnsurePipeInstance();
await _stream.WaitForConnectionAsync(cancellationToken);
var trans = new ServerTransport(_stream, Configuration);
_stream = null; // pass ownership to ServerTransport
//_isPending = false;
return trans;
}
catch (TTransportException)
{
Close();
throw;
}
catch (Exception e)
{
Close();
throw new TTransportException(TTransportException.ExceptionType.NotOpen, e.Message);
}
}
private class ServerTransport : TEndpointTransport
{
private readonly NamedPipeServerStream PipeStream;
public ServerTransport(NamedPipeServerStream stream, TConfiguration config)
: base(config)
{
PipeStream = stream;
}
public override bool IsOpen => PipeStream != null && PipeStream.IsConnected;
public override Task OpenAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.CompletedTask;
}
public override void Close()
{
PipeStream?.Dispose();
}
public override async ValueTask<int> ReadAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
CheckReadBytesAvailable(length);
#if NET5_0
var numBytes = await PipeStream.ReadAsync(buffer.AsMemory(offset, length), cancellationToken);
#elif NETSTANDARD2_1
var numBytes = await PipeStream.ReadAsync(new Memory<byte>(buffer, offset, length), cancellationToken);
#else
var numBytes = await PipeStream.ReadAsync(buffer, offset, length, cancellationToken);
#endif
CountConsumedMessageBytes(numBytes);
return numBytes;
}
public override async Task WriteAsync(byte[] buffer, int offset, int length, CancellationToken cancellationToken)
{
if (PipeStream == null)
{
throw new TTransportException(TTransportException.ExceptionType.NotOpen);
}
// if necessary, send the data in chunks
// there's a system limit around 0x10000 bytes that we hit otherwise
// MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
var nBytes = Math.Min(15 * 4096, length); // 16 would exceed the limit
while (nBytes > 0)
{
#if NET5_0
await PipeStream.WriteAsync(buffer.AsMemory(offset, nBytes), cancellationToken);
#else
await PipeStream.WriteAsync(buffer, offset, nBytes, cancellationToken);
#endif
offset += nBytes;
length -= nBytes;
nBytes = Math.Min(nBytes, length);
}
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
ResetConsumedMessageSize();
return Task.CompletedTask;
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
PipeStream?.Dispose();
}
}
}
}
}