blob: 635a8417859435c768b1152ce8d90d57b503e827 [file] [log] [blame]
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Thrift.Transport.Pipes;
{$WARN SYMBOL_PLATFORM OFF}
{$I Thrift.Defines.inc}
interface
uses
{$IFDEF OLD_UNIT_NAMES}
Windows, SysUtils, Math, AccCtrl, AclAPI, SyncObjs,
{$ELSE}
Winapi.Windows, System.SysUtils, System.Math, Winapi.AccCtrl, Winapi.AclAPI, System.SyncObjs,
{$ENDIF}
Thrift.Configuration,
Thrift.Transport,
Thrift.Utils,
Thrift.Stream;
const
DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT = 10; // default: fail fast on open
type
//--- Pipe Streams ---
TPipeStreamBase = class( TThriftStreamImpl)
strict protected
FPipe : THandle;
FTimeout : DWORD;
FOpenTimeOut : DWORD; // separate value to allow for fail-fast-on-open scenarios
FOverlapped : Boolean;
procedure Write( const pBuf : Pointer; offset, count : Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
//procedure Open; override; - see derived classes
procedure Close; override;
procedure Flush; override;
function ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
function ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; overload;
procedure WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer); overload;
procedure WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer); overload;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
constructor Create( aEnableOverlapped : Boolean;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT
); reintroduce; overload;
destructor Destroy; override;
end;
TNamedPipeStreamImpl = class sealed( TPipeStreamBase)
strict private
FPipeName : string;
FShareMode : DWORD;
FSecurityAttribs : PSecurityAttributes;
strict protected
procedure Open; override;
public
constructor Create( const aPipeName : string;
const aEnableOverlapped : Boolean;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT
); reintroduce; overload;
end;
THandlePipeStreamImpl = class sealed( TPipeStreamBase)
strict private
FSrcHandle : THandle;
strict protected
procedure Open; override;
public
constructor Create( const aPipeHandle : THandle;
const aOwnsHandle, aEnableOverlapped : Boolean;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT
); reintroduce; overload;
destructor Destroy; override;
end;
//--- Pipe Transports ---
IPipeTransport = interface( IStreamTransport)
['{5E05CC85-434F-428F-BFB2-856A168B5558}']
end;
TPipeTransportBase = class( TStreamTransportImpl, IPipeTransport)
strict protected
// ITransport
function GetIsOpen: Boolean; override;
procedure Open; override;
procedure Close; override;
end;
TNamedPipeTransportClientEndImpl = class( TPipeTransportBase)
public
// Named pipe constructors
constructor Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD;
const aConfig : IThriftConfiguration = nil
); reintroduce; overload;
constructor Create( const aPipeName : string;
const aShareMode: DWORD = 0;
const aSecurityAttributes: PSecurityAttributes = nil;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
const aOpenTimeOut : DWORD = DEFAULT_THRIFT_PIPE_OPEN_TIMEOUT;
const aConfig : IThriftConfiguration = nil
); reintroduce; overload;
end;
TNamedPipeTransportServerEndImpl = class( TNamedPipeTransportClientEndImpl)
strict private
FHandle : THandle;
strict protected
// ITransport
procedure Close; override;
public
constructor Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
const aConfig : IThriftConfiguration = nil
); reintroduce; overload;
end;
TAnonymousPipeTransportImpl = class( TPipeTransportBase)
public
// Anonymous pipe constructor
constructor Create( const aPipeRead, aPipeWrite : THandle;
const aOwnsHandles : Boolean;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
const aConfig : IThriftConfiguration = nil
); reintroduce; overload;
end;
//--- Server Transports ---
IAnonymousPipeServerTransport = interface( IServerTransport)
['{7AEE6793-47B9-4E49-981A-C39E9108E9AD}']
// Server side anonymous pipe ends
function ReadHandle : THandle;
function WriteHandle : THandle;
// Client side anonymous pipe ends
function ClientAnonRead : THandle;
function ClientAnonWrite : THandle;
end;
INamedPipeServerTransport = interface( IServerTransport)
['{9DF9EE48-D065-40AF-8F67-D33037D3D960}']
function Handle : THandle;
end;
TPipeServerTransportBase = class( TServerTransportImpl)
strict protected
FStopServer : TEvent;
procedure InternalClose; virtual; abstract;
function QueryStopServer : Boolean;
public
constructor Create( const aConfig : IThriftConfiguration);
destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
end;
TAnonymousPipeServerTransportImpl = class( TPipeServerTransportBase, IAnonymousPipeServerTransport)
strict private
FBufSize : DWORD;
// Server side anonymous pipe handles
FReadHandle,
FWriteHandle : THandle;
//Client side anonymous pipe handles
FClientAnonRead,
FClientAnonWrite : THandle;
FTimeOut: DWORD;
strict protected
function Accept(const fnAccepting: TProc): ITransport; override;
function CreateAnonPipe : Boolean;
// IAnonymousPipeServerTransport
function ReadHandle : THandle;
function WriteHandle : THandle;
function ClientAnonRead : THandle;
function ClientAnonWrite : THandle;
procedure InternalClose; override;
public
constructor Create( const aBufsize : Cardinal = 4096;
const aTimeOut : DWORD = DEFAULT_THRIFT_TIMEOUT;
const aConfig : IThriftConfiguration = nil
); reintroduce; overload;
end;
TNamedPipeServerTransportImpl = class( TPipeServerTransportBase, INamedPipeServerTransport)
strict private
FPipeName : string;
FMaxConns : DWORD;
FBufSize : DWORD;
FTimeout : DWORD;
FHandle : THandle;
FConnected : Boolean;
strict protected
function Accept(const fnAccepting: TProc): ITransport; override;
function CreateNamedPipe : THandle;
function CreateTransportInstance : ITransport;
// INamedPipeServerTransport
function Handle : THandle;
procedure InternalClose; override;
public
constructor Create( const aPipename : string;
const aBufsize : Cardinal = 4096;
const aMaxConns : Cardinal = PIPE_UNLIMITED_INSTANCES;
const aTimeOut : Cardinal = INFINITE;
const aConfig : IThriftConfiguration = nil
); reintroduce; overload;
end;
implementation
procedure ClosePipeHandle( var hPipe : THandle);
begin
if hPipe <> INVALID_HANDLE_VALUE
then try
CloseHandle( hPipe);
finally
hPipe := INVALID_HANDLE_VALUE;
end;
end;
function DuplicatePipeHandle( const hSource : THandle) : THandle;
begin
if not DuplicateHandle( GetCurrentProcess, hSource,
GetCurrentProcess, @result,
0, FALSE, DUPLICATE_SAME_ACCESS)
then raise TTransportExceptionNotOpen.Create('DuplicateHandle: '+SysErrorMessage(GetLastError));
end;
{ TPipeStreamBase }
constructor TPipeStreamBase.Create( aEnableOverlapped : Boolean; const aTimeOut, aOpenTimeOut : DWORD);
begin
inherited Create;
FPipe := INVALID_HANDLE_VALUE;
FTimeout := aTimeOut;
FOpenTimeOut := aOpenTimeOut;
FOverlapped := aEnableOverlapped;
ASSERT( FTimeout > 0); // FOpenTimeout may be 0
end;
destructor TPipeStreamBase.Destroy;
begin
try
Close;
finally
inherited Destroy;
end;
end;
procedure TPipeStreamBase.Close;
begin
ClosePipeHandle( FPipe);
end;
procedure TPipeStreamBase.Flush;
begin
FlushFileBuffers( FPipe);
end;
function TPipeStreamBase.IsOpen: Boolean;
begin
result := (FPipe <> INVALID_HANDLE_VALUE);
end;
procedure TPipeStreamBase.Write( const pBuf : Pointer; offset, count : Integer);
begin
if FOverlapped
then WriteOverlapped( pBuf, offset, count)
else WriteDirect( pBuf, offset, count);
end;
function TPipeStreamBase.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
begin
if FOverlapped
then result := ReadOverlapped( pBuf, buflen, offset, count)
else result := ReadDirect( pBuf, buflen, offset, count);
end;
procedure TPipeStreamBase.WriteDirect( const pBuf : Pointer; offset: Integer; count: Integer);
var cbWritten, nBytes : DWORD;
pData : PByte;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
// if necessary, send the data in chunks
// there's a system limit around 0x10000 bytes that we hit otherwise
// MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
nBytes := Min( 15*4096, count); // 16 would exceed the limit
pData := pBuf;
Inc( pData, offset);
while nBytes > 0 do begin
if not WriteFile( FPipe, pData^, nBytes, cbWritten, nil)
then raise TTransportExceptionNotOpen.Create('Write to pipe failed');
Inc( pData, cbWritten);
Dec( count, cbWritten);
nBytes := Min( nBytes, count);
end;
end;
procedure TPipeStreamBase.WriteOverlapped( const pBuf : Pointer; offset: Integer; count: Integer);
var cbWritten, dwWait, dwError, nBytes : DWORD;
overlapped : IOverlappedHelper;
pData : PByte;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called write on non-open pipe');
// if necessary, send the data in chunks
// there's a system limit around 0x10000 bytes that we hit otherwise
// MSDN: "Pipe write operations across a network are limited to 65,535 bytes per write. For more information regarding pipes, see the Remarks section."
nBytes := Min( 15*4096, count); // 16 would exceed the limit
pData := pBuf;
Inc( pData, offset);
while nBytes > 0 do begin
overlapped := TOverlappedHelperImpl.Create;
if not WriteFile( FPipe, pData^, nBytes, cbWritten, overlapped.OverlappedPtr)
then begin
dwError := GetLastError;
case dwError of
ERROR_IO_PENDING : begin
dwWait := overlapped.WaitFor(FTimeout);
if (dwWait = WAIT_TIMEOUT) then begin
CancelIo( FPipe); // prevents possible AV on invalid overlapped ptr
raise TTransportExceptionTimedOut.Create('Pipe write timed out');
end;
if (dwWait <> WAIT_OBJECT_0)
or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbWritten, TRUE)
then raise TTransportExceptionUnknown.Create('Pipe write error');
end;
else
raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
end;
ASSERT( DWORD(nBytes) = cbWritten);
Inc( pData, cbWritten);
Dec( count, cbWritten);
nBytes := Min( nBytes, count);
end;
end;
function TPipeStreamBase.ReadDirect( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
var cbRead, dwErr, nRemaining : DWORD;
bytes, retries : LongInt;
bOk : Boolean;
pData : PByte;
const INTERVAL = 10; // ms
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
// MSDN: Handle can be a handle to a named pipe instance,
// or it can be a handle to the read end of an anonymous pipe,
// The handle must have GENERIC_READ access to the pipe.
if FTimeOut <> INFINITE then begin
retries := Max( 1, Round( 1.0 * FTimeOut / INTERVAL));
while TRUE do begin
if not PeekNamedPipe( FPipe, nil, 0, nil, @bytes, nil) then begin
dwErr := GetLastError;
if (dwErr = ERROR_INVALID_HANDLE)
or (dwErr = ERROR_BROKEN_PIPE)
or (dwErr = ERROR_PIPE_NOT_CONNECTED)
then begin
result := 0; // other side closed the pipe
Exit;
end;
end
else if bytes > 0 then begin
Break; // there are data
end;
Dec( retries);
if retries > 0
then Sleep( INTERVAL)
else raise TTransportExceptionTimedOut.Create('Pipe read timed out');
end;
end;
result := 0;
nRemaining := count;
pData := pBuf;
Inc( pData, offset);
while nRemaining > 0 do begin
// read the data (or block INFINITE-ly)
bOk := ReadFile( FPipe, pData^, nRemaining, cbRead, nil);
if (not bOk) and (GetLastError() <> ERROR_MORE_DATA)
then Break; // No more data, possibly because client disconnected.
Dec( nRemaining, cbRead);
Inc( pData, cbRead);
Inc( result, cbRead);
end;
end;
function TPipeStreamBase.ReadOverlapped( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
var cbRead, dwWait, dwError, nRemaining : DWORD;
bOk : Boolean;
overlapped : IOverlappedHelper;
pData : PByte;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('Called read on non-open pipe');
result := 0;
nRemaining := count;
pData := pBuf;
Inc( pData, offset);
while nRemaining > 0 do begin
overlapped := TOverlappedHelperImpl.Create;
// read the data
bOk := ReadFile( FPipe, pData^, nRemaining, cbRead, overlapped.OverlappedPtr);
if not bOk then begin
dwError := GetLastError;
case dwError of
ERROR_IO_PENDING : begin
dwWait := overlapped.WaitFor(FTimeout);
if (dwWait = WAIT_TIMEOUT) then begin
CancelIo( FPipe); // prevents possible AV on invalid overlapped ptr
raise TTransportExceptionTimedOut.Create('Pipe read timed out');
end;
if (dwWait <> WAIT_OBJECT_0)
or not GetOverlappedResult( FPipe, overlapped.Overlapped, cbRead, TRUE)
then raise TTransportExceptionUnknown.Create('Pipe read error');
end;
else
raise TTransportExceptionUnknown.Create(SysErrorMessage(dwError));
end;
end;
ASSERT( cbRead > 0); // see TTransportImpl.ReadAll()
ASSERT( cbRead <= DWORD(nRemaining));
Dec( nRemaining, cbRead);
Inc( pData, cbRead);
Inc( result, cbRead);
end;
end;
function TPipeStreamBase.ToArray: TBytes;
var bytes : LongInt;
begin
SetLength( result, 0);
bytes := 0;
if IsOpen
and PeekNamedPipe( FPipe, nil, 0, nil, @bytes, nil)
and (bytes > 0)
then begin
SetLength( result, bytes);
Read( result, 0, bytes);
end;
end;
{ TNamedPipeStreamImpl }
constructor TNamedPipeStreamImpl.Create( const aPipeName : string;
const aEnableOverlapped : Boolean;
const aShareMode: DWORD;
const aSecurityAttributes: PSecurityAttributes;
const aTimeOut, aOpenTimeOut : DWORD);
begin
inherited Create( aEnableOverlapped, aTimeOut, aOpenTimeOut);
FPipeName := aPipeName;
FShareMode := aShareMode;
FSecurityAttribs := aSecurityAttributes;
if Copy(FPipeName,1,2) <> '\\'
then FPipeName := '\\.\pipe\' + FPipeName; // assume localhost
end;
procedure TNamedPipeStreamImpl.Open;
var hPipe : THandle;
retries, timeout, dwErr : DWORD;
const INTERVAL = 10; // ms
begin
if IsOpen then Exit;
retries := Max( 1, Round( 1.0 * FOpenTimeOut / INTERVAL));
timeout := FOpenTimeOut;
// if the server hasn't gotten to the point where the pipe has been created, at least wait the timeout
// According to MSDN, if no instances of the specified named pipe exist, the WaitNamedPipe function
// returns IMMEDIATELY, regardless of the time-out value.
// Always use INTERVAL, since WaitNamedPipe(0) defaults to some other value
while not WaitNamedPipe( PChar(FPipeName), INTERVAL) do begin
dwErr := GetLastError;
if dwErr <> ERROR_FILE_NOT_FOUND
then raise TTransportExceptionNotOpen.Create('Unable to open pipe, '+SysErrorMessage(dwErr));
if timeout <> INFINITE then begin
if (retries > 0)
then Dec(retries)
else raise TTransportExceptionNotOpen.Create('Unable to open pipe, timed out');
end;
Sleep(INTERVAL)
end;
// open that thingy
hPipe := CreateFile( PChar( FPipeName),
GENERIC_READ or GENERIC_WRITE,
FShareMode, // sharing
FSecurityAttribs, // security attributes
OPEN_EXISTING, // opens existing pipe
FILE_FLAG_OVERLAPPED or FILE_FLAG_WRITE_THROUGH, // async+fast, please
0); // no template file
if hPipe = INVALID_HANDLE_VALUE
then raise TTransportExceptionNotOpen.Create('Unable to open pipe, '+SysErrorMessage(GetLastError));
// everything fine
FPipe := hPipe;
end;
{ THandlePipeStreamImpl }
constructor THandlePipeStreamImpl.Create( const aPipeHandle : THandle;
const aOwnsHandle, aEnableOverlapped : Boolean;
const aTimeOut : DWORD);
begin
inherited Create( aEnableOverlapped, aTimeout, aTimeout);
if aOwnsHandle
then FSrcHandle := aPipeHandle
else FSrcHandle := DuplicatePipeHandle( aPipeHandle);
Open;
end;
destructor THandlePipeStreamImpl.Destroy;
begin
try
ClosePipeHandle( FSrcHandle);
finally
inherited Destroy;
end;
end;
procedure THandlePipeStreamImpl.Open;
begin
if not IsOpen
then FPipe := DuplicatePipeHandle( FSrcHandle);
end;
{ TPipeTransportBase }
function TPipeTransportBase.GetIsOpen: Boolean;
begin
result := (FInputStream <> nil) and (FInputStream.IsOpen)
and (FOutputStream <> nil) and (FOutputStream.IsOpen);
end;
procedure TPipeTransportBase.Open;
begin
FInputStream.Open;
FOutputStream.Open;
end;
procedure TPipeTransportBase.Close;
begin
FInputStream.Close;
FOutputStream.Close;
end;
{ TNamedPipeTransportClientEndImpl }
constructor TNamedPipeTransportClientEndImpl.Create( const aPipeName : string;
const aShareMode: DWORD;
const aSecurityAttributes: PSecurityAttributes;
const aTimeOut, aOpenTimeOut : DWORD;
const aConfig : IThriftConfiguration);
// Named pipe constructor
begin
inherited Create( nil, nil, aConfig);
FInputStream := TNamedPipeStreamImpl.Create( aPipeName, TRUE, aShareMode, aSecurityAttributes, aTimeOut, aOpenTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
constructor TNamedPipeTransportClientEndImpl.Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD;
const aConfig : IThriftConfiguration);
// Named pipe constructor
begin
inherited Create( nil, nil, aConfig);
FInputStream := THandlePipeStreamImpl.Create( aPipe, aOwnsHandle, TRUE, aTimeOut);
FOutputStream := FInputStream; // true for named pipes
end;
{ TNamedPipeTransportServerEndImpl }
constructor TNamedPipeTransportServerEndImpl.Create( const aPipe : THandle;
const aOwnsHandle : Boolean;
const aTimeOut : DWORD;
const aConfig : IThriftConfiguration);
// Named pipe constructor
begin
FHandle := DuplicatePipeHandle( aPipe);
inherited Create( aPipe, aOwnsHandle, aTimeout, aConfig);
end;
procedure TNamedPipeTransportServerEndImpl.Close;
begin
FlushFileBuffers( FHandle);
DisconnectNamedPipe( FHandle); // force client off the pipe
ClosePipeHandle( FHandle);
inherited Close;
end;
{ TAnonymousPipeTransportImpl }
constructor TAnonymousPipeTransportImpl.Create( const aPipeRead, aPipeWrite : THandle;
const aOwnsHandles : Boolean;
const aTimeOut : DWORD;
const aConfig : IThriftConfiguration);
// Anonymous pipe constructor
begin
inherited Create( nil, nil, aConfig);
// overlapped is not supported with AnonPipes, see MSDN
FInputStream := THandlePipeStreamImpl.Create( aPipeRead, aOwnsHandles, FALSE, aTimeout);
FOutputStream := THandlePipeStreamImpl.Create( aPipeWrite, aOwnsHandles, FALSE, aTimeout);
end;
{ TPipeServerTransportBase }
constructor TPipeServerTransportBase.Create( const aConfig : IThriftConfiguration);
begin
inherited Create( aConfig);
FStopServer := TEvent.Create(nil,TRUE,FALSE,''); // manual reset
end;
destructor TPipeServerTransportBase.Destroy;
begin
try
FreeAndNil( FStopServer);
finally
inherited Destroy;
end;
end;
function TPipeServerTransportBase.QueryStopServer : Boolean;
begin
result := (FStopServer = nil)
or (FStopServer.WaitFor(0) <> wrTimeout);
end;
procedure TPipeServerTransportBase.Listen;
begin
FStopServer.ResetEvent;
end;
procedure TPipeServerTransportBase.Close;
begin
FStopServer.SetEvent;
InternalClose;
end;
{ TAnonymousPipeServerTransportImpl }
constructor TAnonymousPipeServerTransportImpl.Create( const aBufsize : Cardinal;
const aTimeOut : DWORD;
const aConfig : IThriftConfiguration);
// Anonymous pipe CTOR
begin
inherited Create(aConfig);
FBufsize := aBufSize;
FReadHandle := INVALID_HANDLE_VALUE;
FWriteHandle := INVALID_HANDLE_VALUE;
FClientAnonRead := INVALID_HANDLE_VALUE;
FClientAnonWrite := INVALID_HANDLE_VALUE;
FTimeOut := aTimeOut;
// The anonymous pipe needs to be created first so that the server can
// pass the handles on to the client before the serve (acceptImpl)
// blocking call.
if not CreateAnonPipe
then raise TTransportExceptionNotOpen.Create(ClassName+'.Create() failed');
end;
function TAnonymousPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
var buf : Byte;
br : DWORD;
begin
if Assigned(fnAccepting)
then fnAccepting();
// This 0-byte read serves merely as a blocking call.
if not ReadFile( FReadHandle, buf, 0, br, nil)
and (GetLastError() <> ERROR_MORE_DATA)
then raise TTransportExceptionNotOpen.Create('TServerPipe unable to initiate pipe communication');
// create the transport impl
result := TAnonymousPipeTransportImpl.Create( FReadHandle, FWriteHandle, FALSE, FTimeOut, Configuration);
end;
procedure TAnonymousPipeServerTransportImpl.InternalClose;
begin
ClosePipeHandle( FReadHandle);
ClosePipeHandle( FWriteHandle);
ClosePipeHandle( FClientAnonRead);
ClosePipeHandle( FClientAnonWrite);
end;
function TAnonymousPipeServerTransportImpl.ReadHandle : THandle;
begin
result := FReadHandle;
end;
function TAnonymousPipeServerTransportImpl.WriteHandle : THandle;
begin
result := FWriteHandle;
end;
function TAnonymousPipeServerTransportImpl.ClientAnonRead : THandle;
begin
result := FClientAnonRead;
end;
function TAnonymousPipeServerTransportImpl.ClientAnonWrite : THandle;
begin
result := FClientAnonWrite;
end;
function TAnonymousPipeServerTransportImpl.CreateAnonPipe : Boolean;
var sd : PSECURITY_DESCRIPTOR;
sa : SECURITY_ATTRIBUTES; //TSecurityAttributes;
hCAR, hPipeW, hCAW, hPipe : THandle;
begin
sd := PSECURITY_DESCRIPTOR( LocalAlloc( LPTR,SECURITY_DESCRIPTOR_MIN_LENGTH));
try
Win32Check( InitializeSecurityDescriptor( sd, SECURITY_DESCRIPTOR_REVISION));
Win32Check( SetSecurityDescriptorDacl( sd, TRUE, nil, FALSE));
sa.nLength := sizeof( sa);
sa.lpSecurityDescriptor := sd;
sa.bInheritHandle := TRUE; //allow passing handle to child
Result := CreatePipe( hCAR, hPipeW, @sa, FBufSize); //create stdin pipe
if not Result then begin //create stdin pipe
raise TTransportExceptionNotOpen.Create('TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
Exit;
end;
Result := CreatePipe( hPipe, hCAW, @sa, FBufSize); //create stdout pipe
if not Result then begin //create stdout pipe
CloseHandle( hCAR);
CloseHandle( hPipeW);
raise TTransportExceptionNotOpen.Create('TServerPipe CreatePipe (anon) failed, '+SysErrorMessage(GetLastError));
Exit;
end;
FClientAnonRead := hCAR;
FClientAnonWrite := hCAW;
FReadHandle := hPipe;
FWriteHandle := hPipeW;
finally
if sd <> nil then LocalFree( Cardinal(sd));
end;
end;
{ TNamedPipeServerTransportImpl }
constructor TNamedPipeServerTransportImpl.Create( const aPipename : string;
const aBufsize, aMaxConns, aTimeOut : Cardinal;
const aConfig : IThriftConfiguration);
// Named Pipe CTOR
begin
inherited Create( aConfig);
FPipeName := aPipename;
FBufsize := aBufSize;
FMaxConns := Max( 1, Min( PIPE_UNLIMITED_INSTANCES, aMaxConns));
FHandle := INVALID_HANDLE_VALUE;
FTimeout := aTimeOut;
FConnected := FALSE;
ASSERT( FTimeout > 0);
if Copy(FPipeName,1,2) <> '\\'
then FPipeName := '\\.\pipe\' + FPipeName; // assume localhost
end;
function TNamedPipeServerTransportImpl.Accept(const fnAccepting: TProc): ITransport;
var dwError, dwWait, dwDummy : DWORD;
overlapped : IOverlappedHelper;
handles : array[0..1] of THandle;
begin
overlapped := TOverlappedHelperImpl.Create;
ASSERT( not FConnected);
CreateNamedPipe;
while not FConnected do begin
if QueryStopServer then begin
InternalClose;
Abort;
end;
if Assigned(fnAccepting)
then fnAccepting();
// Wait for the client to connect; if it succeeds, the
// function returns a nonzero value. If the function returns
// zero, GetLastError should return ERROR_PIPE_CONNECTED.
if ConnectNamedPipe( Handle, overlapped.OverlappedPtr) then begin
FConnected := TRUE;
Break;
end;
// ConnectNamedPipe() returns FALSE for OverlappedIO, even if connected.
// We have to check GetLastError() explicitly to find out
dwError := GetLastError;
case dwError of
ERROR_PIPE_CONNECTED : begin
FConnected := not QueryStopServer; // special case: pipe immediately connected
end;
ERROR_IO_PENDING : begin
handles[0] := overlapped.WaitHandle;
handles[1] := FStopServer.Handle;
dwWait := WaitForMultipleObjects( 2, @handles, FALSE, FTimeout);
FConnected := (dwWait = WAIT_OBJECT_0)
and GetOverlappedResult( Handle, overlapped.Overlapped, dwDummy, TRUE)
and not QueryStopServer;
end;
else
InternalClose;
raise TTransportExceptionNotOpen.Create('Client connection failed');
end;
end;
// create the transport impl
result := CreateTransportInstance;
end;
function TNamedPipeServerTransportImpl.CreateTransportInstance : ITransport;
// create the transport impl
var hPipe : THandle;
begin
hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
try
FConnected := FALSE;
result := TNamedPipeTransportServerEndImpl.Create( hPipe, TRUE, FTimeout, Configuration);
except
ClosePipeHandle(hPipe);
raise;
end;
end;
procedure TNamedPipeServerTransportImpl.InternalClose;
var hPipe : THandle;
begin
hPipe := THandle( InterlockedExchangePointer( Pointer(FHandle), Pointer(INVALID_HANDLE_VALUE)));
if hPipe = INVALID_HANDLE_VALUE then Exit;
try
if FConnected
then FlushFileBuffers( hPipe)
else CancelIo( hPipe);
DisconnectNamedPipe( hPipe);
finally
ClosePipeHandle( hPipe);
FConnected := FALSE;
end;
end;
function TNamedPipeServerTransportImpl.Handle : THandle;
begin
{$IFDEF WIN64}
result := THandle( InterlockedExchangeAdd64( Int64(FHandle), 0));
{$ELSE}
result := THandle( InterlockedExchangeAdd( Integer(FHandle), 0));
{$ENDIF}
end;
function TNamedPipeServerTransportImpl.CreateNamedPipe : THandle;
var SIDAuthWorld : SID_IDENTIFIER_AUTHORITY ;
everyone_sid : PSID;
ea : EXPLICIT_ACCESS;
acl : PACL;
sd : PSECURITY_DESCRIPTOR;
sa : SECURITY_ATTRIBUTES;
const
SECURITY_WORLD_SID_AUTHORITY : TSIDIdentifierAuthority = (Value : (0,0,0,0,0,1));
SECURITY_WORLD_RID = $00000000;
begin
sd := nil;
everyone_sid := nil;
try
ASSERT( (FHandle = INVALID_HANDLE_VALUE) and not FConnected);
// Windows - set security to allow non-elevated apps
// to access pipes created by elevated apps.
SIDAuthWorld := SECURITY_WORLD_SID_AUTHORITY;
AllocateAndInitializeSid( SIDAuthWorld, 1, SECURITY_WORLD_RID, 0, 0, 0, 0, 0, 0, 0, everyone_sid);
ZeroMemory( @ea, SizeOf(ea));
ea.grfAccessPermissions := GENERIC_ALL; //SPECIFIC_RIGHTS_ALL or STANDARD_RIGHTS_ALL;
ea.grfAccessMode := SET_ACCESS;
ea.grfInheritance := NO_INHERITANCE;
ea.Trustee.TrusteeForm := TRUSTEE_IS_SID;
ea.Trustee.TrusteeType := TRUSTEE_IS_WELL_KNOWN_GROUP;
ea.Trustee.ptstrName := PChar(everyone_sid);
acl := nil;
SetEntriesInAcl( 1, @ea, nil, acl);
sd := PSECURITY_DESCRIPTOR( LocalAlloc( LPTR,SECURITY_DESCRIPTOR_MIN_LENGTH));
Win32Check( InitializeSecurityDescriptor( sd, SECURITY_DESCRIPTOR_REVISION));
Win32Check( SetSecurityDescriptorDacl( sd, TRUE, acl, FALSE));
sa.nLength := SizeOf(sa);
sa.lpSecurityDescriptor := sd;
sa.bInheritHandle := FALSE;
// Create an instance of the named pipe
{$IFDEF OLD_UNIT_NAMES}
result := Windows.CreateNamedPipe(
{$ELSE}
result := Winapi.Windows.CreateNamedPipe(
{$ENDIF}
PChar( FPipeName), // pipe name
PIPE_ACCESS_DUPLEX or // read/write access
FILE_FLAG_OVERLAPPED, // async mode
PIPE_TYPE_BYTE or // byte type pipe
PIPE_READMODE_BYTE, // byte read mode
FMaxConns, // max. instances
FBufSize, // output buffer size
FBufSize, // input buffer size
FTimeout, // time-out, see MSDN
@sa // default security attribute
);
if( result <> INVALID_HANDLE_VALUE)
then InterlockedExchangePointer( Pointer(FHandle), Pointer(result))
else raise TTransportExceptionNotOpen.Create('CreateNamedPipe() failed ' + IntToStr(GetLastError));
finally
if sd <> nil then LocalFree( Cardinal( sd));
if acl <> nil then LocalFree( Cardinal( acl));
if everyone_sid <> nil then FreeSid(everyone_sid);
end;
end;
end.