blob: 6a69d93380f2ac1da4d7e6e26eb04d398c8be53f [file] [log] [blame]
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Thrift.Transport;
{$I Thrift.Defines.inc}
{$SCOPEDENUMS ON}
interface
uses
Classes,
SysUtils,
Math,
Generics.Collections,
{$IFDEF OLD_UNIT_NAMES}
WinSock, Sockets,
{$ELSE}
Winapi.WinSock,
{$IFDEF OLD_SOCKETS}
Web.Win.Sockets,
{$ELSE}
Thrift.Socket,
{$ENDIF}
{$ENDIF}
Thrift.Configuration,
Thrift.Collections,
Thrift.Exception,
Thrift.Utils,
Thrift.WinHTTP,
Thrift.Stream;
const
DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; // 100 MB
DEFAULT_THRIFT_TIMEOUT = 5 * 1000; // ms
type
IStreamTransport = interface;
ITransport = interface
['{52F81383-F880-492F-8AA7-A66B85B93D6B}']
function GetIsOpen: Boolean;
property IsOpen: Boolean read GetIsOpen;
function Peek: Boolean;
procedure Open;
procedure Close;
function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload;
function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload;
procedure Write( const buf: TBytes); overload;
procedure Write( const buf: TBytes; off: Integer; len: Integer); overload;
procedure Write( const pBuf : Pointer; off, len : Integer); overload;
procedure Write( const pBuf : Pointer; len : Integer); overload;
procedure Flush;
function Configuration : IThriftConfiguration;
function MaxMessageSize : Integer;
procedure ResetConsumedMessageSize( const knownSize : Int64 = -1);
procedure CheckReadBytesAvailable( const numBytes : Int64);
procedure UpdateKnownMessageSize( const size : Int64);
end;
TTransportBase = class abstract( TInterfacedObject)
strict protected
function GetIsOpen: Boolean; virtual; abstract;
property IsOpen: Boolean read GetIsOpen;
function Peek: Boolean; virtual;
procedure Open(); virtual; abstract;
procedure Close(); virtual; abstract;
function Read(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
function Read(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual; abstract;
function ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer; overload; inline;
function ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; overload; virtual;
procedure Write( const buf: TBytes); overload; inline;
procedure Write( const buf: TBytes; off: Integer; len: Integer); overload; inline;
procedure Write( const pBuf : Pointer; len : Integer); overload; inline;
procedure Write( const pBuf : Pointer; off, len : Integer); overload; virtual; abstract;
procedure Flush; virtual;
function Configuration : IThriftConfiguration; virtual; abstract;
procedure UpdateKnownMessageSize( const size : Int64); virtual; abstract;
end;
// base class for all endpoint transports, e.g. sockets, pipes or HTTP
TEndpointTransportBase = class abstract( TTransportBase, ITransport)
strict private
FRemainingMessageSize : Int64;
FKnownMessageSize : Int64;
FConfiguration : IThriftConfiguration;
strict protected
function Configuration : IThriftConfiguration; override;
function MaxMessageSize : Integer;
property RemainingMessageSize : Int64 read FRemainingMessageSize;
property KnownMessageSize : Int64 read FKnownMessageSize;
procedure ResetConsumedMessageSize( const newSize : Int64 = -1);
procedure UpdateKnownMessageSize(const size : Int64); override;
procedure CheckReadBytesAvailable(const numBytes : Int64); inline;
procedure CountConsumedMessageBytes(const numBytes : Int64); inline;
public
constructor Create( const aConfig : IThriftConfiguration); reintroduce;
end;
// base class for all layered transports, e.g. framed
TLayeredTransportBase<T : ITransport> = class abstract( TTransportBase, ITransport)
strict private
FTransport : T;
strict protected
property InnerTransport : T read FTransport;
function GetUnderlyingTransport: ITransport;
function Configuration : IThriftConfiguration; override;
procedure UpdateKnownMessageSize( const size : Int64); override;
function MaxMessageSize : Integer; inline;
procedure ResetConsumedMessageSize( const knownSize : Int64 = -1); inline;
procedure CheckReadBytesAvailable( const numBytes : Int64); virtual;
public
constructor Create( const aTransport: T); reintroduce;
property UnderlyingTransport: ITransport read GetUnderlyingTransport;
end;
TTransportException = class abstract( TException)
public
type
TExceptionType = (
Unknown,
NotOpen,
AlreadyOpen,
TimedOut,
EndOfFile,
BadArgs,
Interrupted,
CorruptedData
);
strict protected
constructor HiddenCreate(const Msg: string);
class function GetType: TExceptionType; virtual; abstract;
public
class function Create( aType: TExceptionType): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
class function Create( const msg: string): TTransportException; reintroduce; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
class function Create( aType: TExceptionType; const msg: string): TTransportException; overload; deprecated 'Use specialized TTransportException types (or regenerate from IDL)';
property Type_: TExceptionType read GetType;
end;
// Needed to remove deprecation warning
TTransportExceptionSpecialized = class abstract (TTransportException)
public
constructor Create(const Msg: string);
end;
TTransportExceptionUnknown = class (TTransportExceptionSpecialized)
strict protected
class function GetType: TTransportException.TExceptionType; override;
end;
TTransportExceptionNotOpen = class (TTransportExceptionSpecialized)
strict protected
class function GetType: TTransportException.TExceptionType; override;
end;
TTransportExceptionAlreadyOpen = class (TTransportExceptionSpecialized)
strict protected
class function GetType: TTransportException.TExceptionType; override;
end;
TTransportExceptionTimedOut = class (TTransportExceptionSpecialized)
strict protected
class function GetType: TTransportException.TExceptionType; override;
end;
TTransportExceptionEndOfFile = class (TTransportExceptionSpecialized)
strict protected
class function GetType: TTransportException.TExceptionType; override;
end;
TTransportExceptionBadArgs = class (TTransportExceptionSpecialized)
strict protected
class function GetType: TTransportException.TExceptionType; override;
end;
TTransportExceptionInterrupted = class (TTransportExceptionSpecialized)
strict protected
class function GetType: TTransportException.TExceptionType; override;
end;
TTransportExceptionCorruptedData = class (TTransportExceptionSpecialized)
protected
class function GetType: TTransportException.TExceptionType; override;
end;
TSecureProtocol = (
SSL_2, SSL_3, TLS_1, // outdated, for compatibilty only
TLS_1_1, TLS_1_2 // secure (as of today)
);
TSecureProtocols = set of TSecureProtocol;
IHTTPClient = interface( ITransport )
['{7BF615DD-8680-4004-A5B2-88947BA3BA3D}']
procedure SetDnsResolveTimeout(const Value: Integer);
function GetDnsResolveTimeout: Integer;
procedure SetConnectionTimeout(const Value: Integer);
function GetConnectionTimeout: Integer;
procedure SetSendTimeout(const Value: Integer);
function GetSendTimeout: Integer;
procedure SetReadTimeout(const Value: Integer);
function GetReadTimeout: Integer;
function GetCustomHeaders: IThriftDictionary<string,string>;
procedure SendRequest;
function GetSecureProtocols : TSecureProtocols;
procedure SetSecureProtocols( const value : TSecureProtocols);
property DnsResolveTimeout: Integer read GetDnsResolveTimeout write SetDnsResolveTimeout;
property ConnectionTimeout: Integer read GetConnectionTimeout write SetConnectionTimeout;
property SendTimeout: Integer read GetSendTimeout write SetSendTimeout;
property ReadTimeout: Integer read GetReadTimeout write SetReadTimeout;
property CustomHeaders: IThriftDictionary<string,string> read GetCustomHeaders;
property SecureProtocols : TSecureProtocols read GetSecureProtocols write SetSecureProtocols;
end;
IServerTransport = interface
['{FA01363F-6B40-482F-971E-4A085535EFC8}']
procedure Listen;
procedure Close;
function Accept( const fnAccepting: TProc): ITransport;
function Configuration : IThriftConfiguration;
end;
TServerTransportImpl = class( TInterfacedObject, IServerTransport)
strict private
FConfig : IThriftConfiguration;
strict protected
function Configuration : IThriftConfiguration;
procedure Listen; virtual; abstract;
procedure Close; virtual; abstract;
function Accept( const fnAccepting: TProc): ITransport; virtual; abstract;
public
constructor Create( const aConfig : IThriftConfiguration);
end;
ITransportFactory = interface
['{DD809446-000F-49E1-9BFF-E0D0DC76A9D7}']
function GetTransport( const aTransport: ITransport): ITransport;
end;
TTransportFactoryImpl = class ( TInterfacedObject, ITransportFactory)
strict protected
function GetTransport( const aTransport: ITransport): ITransport; virtual;
end;
TTcpSocketStreamImpl = class( TThriftStreamImpl)
{$IFDEF OLD_SOCKETS}
strict private type
TWaitForData = ( wfd_HaveData, wfd_Timeout, wfd_Error);
strict private
FTcpClient : TCustomIpClient;
FTimeout : Integer;
function Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
TimeOut: Integer; var wsaError : Integer): Integer;
function WaitForData( TimeOut : Integer; pBuf : Pointer; DesiredBytes: Integer;
var wsaError, bytesReady : Integer): TWaitForData;
{$ELSE}
FTcpClient: TSocket;
strict protected const
SLEEP_TIME = 200;
{$ENDIF}
strict protected
procedure Write( const pBuf : Pointer; offset, count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
public
{$IFDEF OLD_SOCKETS}
constructor Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer = DEFAULT_THRIFT_TIMEOUT);
{$ELSE}
constructor Create( const aTcpClient: TSocket; const aTimeout : Longword = DEFAULT_THRIFT_TIMEOUT);
{$ENDIF}
end;
IStreamTransport = interface( ITransport )
['{A8479B47-2A3E-4421-A9A0-D5A9EDCC634A}']
function GetInputStream: IThriftStream;
function GetOutputStream: IThriftStream;
property InputStream : IThriftStream read GetInputStream;
property OutputStream : IThriftStream read GetOutputStream;
end;
TStreamTransportImpl = class( TEndpointTransportBase, IStreamTransport)
strict protected
FInputStream : IThriftStream;
FOutputStream : IThriftStream;
strict protected
function GetIsOpen: Boolean; override;
function GetInputStream: IThriftStream;
function GetOutputStream: IThriftStream;
strict protected
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
public
constructor Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration = nil); reintroduce;
destructor Destroy; override;
property InputStream : IThriftStream read GetInputStream;
property OutputStream : IThriftStream read GetOutputStream;
end;
TBufferedStreamImpl = class( TThriftStreamImpl)
strict private
FStream : IThriftStream;
FBufSize : Integer;
FReadBuffer : TMemoryStream;
FWriteBuffer : TMemoryStream;
strict protected
procedure Write( const pBuf : Pointer; offset: Integer; count: Integer); override;
function Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer; override;
procedure Open; override;
procedure Close; override;
procedure Flush; override;
function IsOpen: Boolean; override;
function ToArray: TBytes; override;
function Size : Int64; override;
function Position : Int64; override;
public
constructor Create( const aStream: IThriftStream; const aBufSize : Integer);
destructor Destroy; override;
end;
TServerSocketImpl = class( TServerTransportImpl)
strict private
{$IFDEF OLD_SOCKETS}
FServer : TTcpServer;
FPort : Integer;
FClientTimeout : Integer;
{$ELSE}
FServer: TServerSocket;
{$ENDIF}
FUseBufferedSocket : Boolean;
FOwnsServer : Boolean;
strict protected
function Accept( const fnAccepting: TProc) : ITransport; override;
public
{$IFDEF OLD_SOCKETS}
constructor Create( const aServer: TTcpServer; const aClientTimeout : Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
constructor Create( const aPort: Integer; const aClientTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
{$ELSE}
constructor Create( const aServer: TServerSocket; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
constructor Create( const aPort: Integer; const aClientTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; aUseBufferedSockets: Boolean = FALSE; const aConfig : IThriftConfiguration = nil); overload;
{$ENDIF}
destructor Destroy; override;
procedure Listen; override;
procedure Close; override;
end;
TBufferedTransportImpl = class( TLayeredTransportBase<IStreamTransport>)
strict private
FInputBuffer : IThriftStream;
FOutputBuffer : IThriftStream;
FBufSize : Integer;
procedure InitBuffers;
strict protected
function GetIsOpen: Boolean; override;
procedure Flush; override;
public
type
TFactory = class( TTransportFactoryImpl )
public
function GetTransport( const aTransport: ITransport): ITransport; override;
end;
constructor Create( const aTransport : IStreamTransport; const aBufSize: Integer = 1024);
procedure Open(); override;
procedure Close(); override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure CheckReadBytesAvailable( const value : Int64); override;
property IsOpen: Boolean read GetIsOpen;
end;
TSocketImpl = class(TStreamTransportImpl)
strict private
{$IFDEF OLD_SOCKETS}
FClient : TCustomIpClient;
{$ELSE}
FClient: TSocket;
{$ENDIF}
FOwnsClient : Boolean;
FHost : string;
FPort : Integer;
{$IFDEF OLD_SOCKETS}
FTimeout : Integer;
{$ELSE}
FTimeout : Longword;
{$ENDIF}
procedure InitSocket;
strict protected
function GetIsOpen: Boolean; override;
public
{$IFDEF OLD_SOCKETS}
constructor Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Integer = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
{$ELSE}
constructor Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration = nil); overload;
constructor Create( const aHost: string; const aPort: Integer; const aTimeout: Longword = DEFAULT_THRIFT_TIMEOUT; const aConfig : IThriftConfiguration = nil); overload;
{$ENDIF}
destructor Destroy; override;
procedure Open; override;
procedure Close; override;
{$IFDEF OLD_SOCKETS}
property TcpClient: TCustomIpClient read FClient;
{$ELSE}
property TcpClient: TSocket read FClient;
{$ENDIF}
property Host : string read FHost;
property Port: Integer read FPort;
end;
TFramedTransportImpl = class( TLayeredTransportBase<ITransport>)
strict protected type
TFramedHeader = Int32;
strict protected
FWriteBuffer : TMemoryStream;
FReadBuffer : TMemoryStream;
procedure InitWriteBuffer;
procedure ReadFrame;
procedure Open(); override;
function GetIsOpen: Boolean; override;
procedure Close(); override;
function Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer; override;
procedure Write( const pBuf : Pointer; off, len : Integer); override;
procedure CheckReadBytesAvailable( const value : Int64); override;
procedure Flush; override;
public
type
TFactory = class( TTransportFactoryImpl )
public
function GetTransport( const aTransport: ITransport): ITransport; override;
end;
constructor Create( const aTransport: ITransport); overload;
destructor Destroy; override;
end;
const
DEFAULT_THRIFT_SECUREPROTOCOLS = [ TSecureProtocol.TLS_1_1, TSecureProtocol.TLS_1_2];
implementation
{ TTransportBase }
procedure TTransportBase.Flush;
begin
// nothing to do
end;
function TTransportBase.Peek: Boolean;
begin
Result := IsOpen;
end;
function TTransportBase.Read(var buf: TBytes; off: Integer; len: Integer): Integer;
begin
if Length(buf) > 0
then result := Read( @buf[0], Length(buf), off, len)
else result := 0;
end;
function TTransportBase.ReadAll(var buf: TBytes; off: Integer; len: Integer): Integer;
begin
if Length(buf) > 0
then result := ReadAll( @buf[0], Length(buf), off, len)
else result := 0;
end;
function TTransportBase.ReadAll(const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
var ret : Integer;
begin
result := 0;
while result < len do begin
ret := Read( pBuf, buflen, off + result, len - result);
if ret > 0
then Inc( result, ret)
else raise TTransportExceptionNotOpen.Create( 'Cannot read, Remote side has closed' );
end;
end;
procedure TTransportBase.Write( const buf: TBytes);
begin
if Length(buf) > 0
then Write( @buf[0], 0, Length(buf));
end;
procedure TTransportBase.Write( const buf: TBytes; off: Integer; len: Integer);
begin
if Length(buf) > 0
then Write( @buf[0], off, len);
end;
procedure TTransportBase.Write( const pBuf : Pointer; len : Integer);
begin
Self.Write( pBuf, 0, len);
end;
{ TEndpointTransportBase }
constructor TEndpointTransportBase.Create( const aConfig : IThriftConfiguration);
begin
inherited Create;
if aConfig <> nil
then FConfiguration := aConfig
else FConfiguration := TThriftConfigurationImpl.Create;
ResetConsumedMessageSize;
end;
function TEndpointTransportBase.Configuration : IThriftConfiguration;
begin
result := FConfiguration;
end;
function TEndpointTransportBase.MaxMessageSize : Integer;
begin
ASSERT( Configuration <> nil);
result := Configuration.MaxMessageSize;
end;
procedure TEndpointTransportBase.ResetConsumedMessageSize( const newSize : Int64);
// Resets RemainingMessageSize to the configured maximum
begin
// full reset
if newSize < 0 then begin
FKnownMessageSize := MaxMessageSize;
FRemainingMessageSize := MaxMessageSize;
Exit;
end;
// update only: message size can shrink, but not grow
ASSERT( KnownMessageSize <= MaxMessageSize);
if newSize > KnownMessageSize
then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
FKnownMessageSize := newSize;
FRemainingMessageSize := newSize;
end;
procedure TEndpointTransportBase.UpdateKnownMessageSize( const size : Int64);
// Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
// Will throw if we already consumed too many bytes.
var consumed : Int64;
begin
consumed := KnownMessageSize - RemainingMessageSize;
ResetConsumedMessageSize(size);
CountConsumedMessageBytes(consumed);
end;
procedure TEndpointTransportBase.CheckReadBytesAvailable( const numBytes : Int64);
// Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
begin
if RemainingMessageSize < numBytes
then raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
end;
procedure TEndpointTransportBase.CountConsumedMessageBytes( const numBytes : Int64);
// Consumes numBytes from the RemainingMessageSize.
begin
if (RemainingMessageSize >= numBytes)
then Dec( FRemainingMessageSize, numBytes)
else begin
FRemainingMessageSize := 0;
raise TTransportExceptionEndOfFile.Create('MaxMessageSize reached');
end;
end;
{ TLayeredTransportBase }
constructor TLayeredTransportBase<T>.Create( const aTransport: T);
begin
inherited Create;
FTransport := aTransport;
end;
function TLayeredTransportBase<T>.GetUnderlyingTransport: ITransport;
begin
result := InnerTransport;
end;
function TLayeredTransportBase<T>.Configuration : IThriftConfiguration;
begin
result := InnerTransport.Configuration;
end;
procedure TLayeredTransportBase<T>.UpdateKnownMessageSize( const size : Int64);
begin
InnerTransport.UpdateKnownMessageSize( size);
end;
function TLayeredTransportBase<T>.MaxMessageSize : Integer;
begin
result := InnerTransport.MaxMessageSize;
end;
procedure TLayeredTransportBase<T>.ResetConsumedMessageSize( const knownSize : Int64 = -1);
begin
InnerTransport.ResetConsumedMessageSize( knownSize);
end;
procedure TLayeredTransportBase<T>.CheckReadBytesAvailable( const numBytes : Int64);
begin
InnerTransport.CheckReadBytesAvailable( numBytes);
end;
{ TTransportException }
constructor TTransportException.HiddenCreate(const Msg: string);
begin
inherited Create(Msg);
end;
class function TTransportException.Create(aType: TExceptionType): TTransportException;
begin
//no inherited;
{$WARN SYMBOL_DEPRECATED OFF}
Result := Create(aType, '')
{$WARN SYMBOL_DEPRECATED DEFAULT}
end;
class function TTransportException.Create(aType: TExceptionType; const msg: string): TTransportException;
begin
case aType of
TExceptionType.NotOpen: Result := TTransportExceptionNotOpen.Create(msg);
TExceptionType.AlreadyOpen: Result := TTransportExceptionAlreadyOpen.Create(msg);
TExceptionType.TimedOut: Result := TTransportExceptionTimedOut.Create(msg);
TExceptionType.EndOfFile: Result := TTransportExceptionEndOfFile.Create(msg);
TExceptionType.BadArgs: Result := TTransportExceptionBadArgs.Create(msg);
TExceptionType.Interrupted: Result := TTransportExceptionInterrupted.Create(msg);
else
ASSERT( TExceptionType.Unknown = aType);
Result := TTransportExceptionUnknown.Create(msg);
end;
end;
class function TTransportException.Create(const msg: string): TTransportException;
begin
Result := TTransportExceptionUnknown.Create(Msg);
end;
{ TTransportExceptionSpecialized }
constructor TTransportExceptionSpecialized.Create(const Msg: string);
begin
inherited HiddenCreate(Msg);
end;
{ specialized TTransportExceptions }
class function TTransportExceptionUnknown.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.Unknown;
end;
class function TTransportExceptionNotOpen.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.NotOpen;
end;
class function TTransportExceptionAlreadyOpen.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.AlreadyOpen;
end;
class function TTransportExceptionTimedOut.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.TimedOut;
end;
class function TTransportExceptionEndOfFile.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.EndOfFile;
end;
class function TTransportExceptionBadArgs.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.BadArgs;
end;
class function TTransportExceptionInterrupted.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.Interrupted;
end;
class function TTransportExceptionCorruptedData.GetType: TTransportException.TExceptionType;
begin
result := TExceptionType.CorruptedData;
end;
{ TTransportFactoryImpl }
function TTransportFactoryImpl.GetTransport( const aTransport: ITransport): ITransport;
begin
Result := aTransport;
end;
{ TServerTransportImpl }
constructor TServerTransportImpl.Create( const aConfig : IThriftConfiguration);
begin
inherited Create;
if aConfig <> nil
then FConfig := aConfig
else FConfig := TThriftConfigurationImpl.Create;
end;
function TServerTransportImpl.Configuration : IThriftConfiguration;
begin
result := FConfig;
end;
{ TServerSocket }
{$IFDEF OLD_SOCKETS}
constructor TServerSocketImpl.Create( const aServer: TTcpServer; const aClientTimeout : Integer; const aConfig : IThriftConfiguration);
{$ELSE}
constructor TServerSocketImpl.Create( const aServer: TServerSocket; const aClientTimeout: Longword; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
inherited Create( aConfig);
FServer := aServer;
{$IFDEF OLD_SOCKETS}
FClientTimeout := aClientTimeout;
{$ELSE}
FServer.RecvTimeout := aClientTimeout;
FServer.SendTimeout := aClientTimeout;
{$ENDIF}
end;
{$IFDEF OLD_SOCKETS}
constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Integer; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
{$ELSE}
constructor TServerSocketImpl.Create( const aPort: Integer; const aClientTimeout: Longword; aUseBufferedSockets: Boolean; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
inherited Create( aConfig);
{$IFDEF OLD_SOCKETS}
FPort := aPort;
FClientTimeout := aClientTimeout;
FOwnsServer := True;
FServer := TTcpServer.Create( nil );
FServer.BlockMode := bmBlocking;
{$IF CompilerVersion >= 21.0}
FServer.LocalPort := AnsiString( IntToStr( FPort));
{$ELSE}
FServer.LocalPort := IntToStr( FPort);
{$IFEND}
{$ELSE}
FOwnsServer := True;
FServer := TServerSocket.Create(aPort, aClientTimeout, aClientTimeout);
{$ENDIF}
FUseBufferedSocket := aUseBufferedSockets;
end;
destructor TServerSocketImpl.Destroy;
begin
if FOwnsServer then begin
FServer.Free;
FServer := nil;
end;
inherited;
end;
function TServerSocketImpl.Accept( const fnAccepting: TProc): ITransport;
var
{$IFDEF OLD_SOCKETS}
client : TCustomIpClient;
{$ELSE}
client: TSocket;
{$ENDIF}
trans : IStreamTransport;
begin
if FServer = nil then begin
raise TTransportExceptionNotOpen.Create('No underlying server socket.');
end;
{$IFDEF OLD_SOCKETS}
client := nil;
try
client := TCustomIpClient.Create(nil);
if Assigned(fnAccepting)
then fnAccepting();
if not FServer.Accept( client) then begin
client.Free;
Result := nil;
Exit;
end;
if client = nil then begin
Result := nil;
Exit;
end;
trans := TSocketImpl.Create( client, TRUE, FClientTimeout, Configuration);
client := nil; // trans owns it now
if FUseBufferedSocket
then result := TBufferedTransportImpl.Create( trans)
else result := trans;
except
on E: Exception do begin
client.Free;
raise TTransportExceptionUnknown.Create(E.ToString);
end;
end;
{$ELSE}
if Assigned(fnAccepting) then
fnAccepting();
client := FServer.Accept;
try
trans := TSocketImpl.Create(client, TRUE, Configuration);
client := nil;
if FUseBufferedSocket then
Result := TBufferedTransportImpl.Create(trans)
else
Result := trans;
except
client.Free;
raise;
end;
{$ENDIF}
end;
procedure TServerSocketImpl.Listen;
begin
if FServer <> nil then
begin
{$IFDEF OLD_SOCKETS}
try
FServer.Active := True;
except
on E: Exception
do raise TTransportExceptionUnknown.Create('Could not accept on listening socket: ' + E.Message);
end;
{$ELSE}
FServer.Listen;
{$ENDIF}
end;
end;
procedure TServerSocketImpl.Close;
begin
if FServer <> nil then
{$IFDEF OLD_SOCKETS}
try
FServer.Active := False;
except
on E: Exception
do raise TTransportExceptionUnknown.Create('Error on closing socket : ' + E.Message);
end;
{$ELSE}
FServer.Close;
{$ENDIF}
end;
{ TSocket }
{$IFDEF OLD_SOCKETS}
constructor TSocketImpl.Create( const aClient : TCustomIpClient; const aOwnsClient : Boolean; const aTimeout: Integer; const aConfig : IThriftConfiguration);
{$ELSE}
constructor TSocketImpl.Create(const aClient: TSocket; const aOwnsClient: Boolean; const aConfig : IThriftConfiguration);
{$ENDIF}
var stream : IThriftStream;
begin
FClient := aClient;
FOwnsClient := aOwnsClient;
{$IFDEF OLD_SOCKETS}
FTimeout := aTimeout;
{$ELSE}
FTimeout := aClient.RecvTimeout;
{$ENDIF}
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
inherited Create( stream, stream, aConfig);
end;
{$IFDEF OLD_SOCKETS}
constructor TSocketImpl.Create(const aHost: string; const aPort, aTimeout: Integer; const aConfig : IThriftConfiguration);
{$ELSE}
constructor TSocketImpl.Create(const aHost: string; const aPort : Integer; const aTimeout: Longword; const aConfig : IThriftConfiguration);
{$ENDIF}
begin
inherited Create(nil,nil, aConfig);
FHost := aHost;
FPort := aPort;
FTimeout := aTimeout;
InitSocket;
end;
destructor TSocketImpl.Destroy;
begin
if FOwnsClient
then FreeAndNil( FClient);
inherited;
end;
procedure TSocketImpl.Close;
begin
inherited Close;
FInputStream := nil;
FOutputStream := nil;
if FOwnsClient
then FreeAndNil( FClient)
else FClient := nil;
end;
function TSocketImpl.GetIsOpen: Boolean;
begin
{$IFDEF OLD_SOCKETS}
Result := (FClient <> nil) and FClient.Connected;
{$ELSE}
Result := (FClient <> nil) and FClient.IsOpen
{$ENDIF}
end;
procedure TSocketImpl.InitSocket;
var
stream : IThriftStream;
begin
if FOwnsClient
then FreeAndNil( FClient)
else FClient := nil;
{$IFDEF OLD_SOCKETS}
FClient := TTcpClient.Create( nil);
{$ELSE}
FClient := TSocket.Create(FHost, FPort);
{$ENDIF}
FOwnsClient := True;
stream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
FInputStream := stream;
FOutputStream := stream;
end;
procedure TSocketImpl.Open;
begin
if IsOpen then begin
raise TTransportExceptionAlreadyOpen.Create('Socket already connected');
end;
if FHost = '' then begin
raise TTransportExceptionNotOpen.Create('Cannot open null host');
end;
if Port <= 0 then begin
raise TTransportExceptionNotOpen.Create('Cannot open without port');
end;
if FClient = nil
then InitSocket;
{$IFDEF OLD_SOCKETS}
FClient.RemoteHost := TSocketHost( Host);
FClient.RemotePort := TSocketPort( IntToStr( Port));
FClient.Connect;
{$ELSE}
FClient.Open;
{$ENDIF}
FInputStream := TTcpSocketStreamImpl.Create( FClient, FTimeout);
FOutputStream := FInputStream;
end;
{ TBufferedStream }
procedure TBufferedStreamImpl.Close;
begin
Flush;
FStream := nil;
FReadBuffer.Free;
FReadBuffer := nil;
FWriteBuffer.Free;
FWriteBuffer := nil;
end;
constructor TBufferedStreamImpl.Create( const aStream: IThriftStream; const aBufSize : Integer);
begin
inherited Create;
FStream := aStream;
FBufSize := aBufSize;
FReadBuffer := TMemoryStream.Create;
FWriteBuffer := TMemoryStream.Create;
end;
destructor TBufferedStreamImpl.Destroy;
begin
Close;
inherited;
end;
procedure TBufferedStreamImpl.Flush;
var
buf : TBytes;
len : Integer;
begin
if IsOpen then begin
len := FWriteBuffer.Size;
if len > 0 then begin
SetLength( buf, len );
FWriteBuffer.Position := 0;
FWriteBuffer.Read( Pointer(@buf[0])^, len );
FStream.Write( buf, 0, len );
end;
FWriteBuffer.Clear;
end;
end;
function TBufferedStreamImpl.IsOpen: Boolean;
begin
Result := (FWriteBuffer <> nil)
and (FReadBuffer <> nil)
and (FStream <> nil)
and FStream.IsOpen;
end;
procedure TBufferedStreamImpl.Open;
begin
FStream.Open;
end;
function TBufferedStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
var
nRead : Integer;
tempbuf : TBytes;
pTmp : PByte;
begin
inherited;
Result := 0;
if IsOpen then begin
while count > 0 do begin
if FReadBuffer.Position >= FReadBuffer.Size then begin
FReadBuffer.Clear;
SetLength( tempbuf, FBufSize);
nRead := FStream.Read( tempbuf, 0, FBufSize );
if nRead = 0 then Break; // avoid infinite loop
FReadBuffer.WriteBuffer( Pointer(@tempbuf[0])^, nRead );
FReadBuffer.Position := 0;
end;
if FReadBuffer.Position < FReadBuffer.Size then begin
nRead := Min( FReadBuffer.Size - FReadBuffer.Position, count);
pTmp := pBuf;
Inc( pTmp, offset);
Inc( Result, FReadBuffer.Read( pTmp^, nRead));
Dec( count, nRead);
Inc( offset, nRead);
end;
end;
end;
end;
function TBufferedStreamImpl.ToArray: TBytes;
var len : Integer;
begin
if IsOpen
then len := FReadBuffer.Size
else len := 0;
SetLength( Result, len);
if len > 0 then begin
FReadBuffer.Position := 0;
FReadBuffer.Read( Pointer(@Result[0])^, len );
end;
end;
procedure TBufferedStreamImpl.Write( const pBuf : Pointer; offset: Integer; count: Integer);
var pTmp : PByte;
begin
inherited;
if count > 0 then begin
if IsOpen then begin
pTmp := pBuf;
Inc( pTmp, offset);
FWriteBuffer.Write( pTmp^, count );
if FWriteBuffer.Size > FBufSize then begin
Flush;
end;
end;
end;
end;
function TBufferedStreamImpl.Size : Int64;
begin
result := FReadBuffer.Size;
end;
function TBufferedStreamImpl.Position : Int64;
begin
result := FReadBuffer.Position;
end;
{ TStreamTransportImpl }
constructor TStreamTransportImpl.Create( const aInputStream, aOutputStream : IThriftStream; const aConfig : IThriftConfiguration);
begin
inherited Create( aConfig);
FInputStream := aInputStream;
FOutputStream := aOutputStream;
end;
destructor TStreamTransportImpl.Destroy;
begin
FInputStream := nil;
FOutputStream := nil;
inherited;
end;
procedure TStreamTransportImpl.Close;
begin
FInputStream := nil;
FOutputStream := nil;
end;
procedure TStreamTransportImpl.Flush;
begin
if FOutputStream = nil then begin
raise TTransportExceptionNotOpen.Create('Cannot flush null outputstream' );
end;
FOutputStream.Flush;
end;
function TStreamTransportImpl.GetInputStream: IThriftStream;
begin
Result := FInputStream;
end;
function TStreamTransportImpl.GetIsOpen: Boolean;
begin
Result := True;
end;
function TStreamTransportImpl.GetOutputStream: IThriftStream;
begin
Result := FOutputStream;
end;
procedure TStreamTransportImpl.Open;
begin
// nothing to do
end;
function TStreamTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputStream = nil
then raise TTransportExceptionNotOpen.Create('Cannot read from null inputstream' );
Result := FInputStream.Read( pBuf,buflen, off, len );
CountConsumedMessageBytes( result);
end;
procedure TStreamTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
begin
if FOutputStream = nil
then raise TTransportExceptionNotOpen.Create('Cannot write to null outputstream' );
FOutputStream.Write( pBuf, off, len );
end;
{ TBufferedTransportImpl }
constructor TBufferedTransportImpl.Create( const aTransport : IStreamTransport; const aBufSize: Integer);
begin
ASSERT( aTransport <> nil);
inherited Create( aTransport);
FBufSize := aBufSize;
InitBuffers;
end;
procedure TBufferedTransportImpl.Close;
begin
InnerTransport.Close;
FInputBuffer := nil;
FOutputBuffer := nil;
end;
procedure TBufferedTransportImpl.Flush;
begin
if FOutputBuffer <> nil then begin
FOutputBuffer.Flush;
end;
end;
function TBufferedTransportImpl.GetIsOpen: Boolean;
begin
Result := InnerTransport.IsOpen;
end;
procedure TBufferedTransportImpl.InitBuffers;
begin
if InnerTransport.InputStream <> nil then begin
FInputBuffer := TBufferedStreamImpl.Create( InnerTransport.InputStream, FBufSize );
end;
if InnerTransport.OutputStream <> nil then begin
FOutputBuffer := TBufferedStreamImpl.Create( InnerTransport.OutputStream, FBufSize );
end;
end;
procedure TBufferedTransportImpl.Open;
begin
InnerTransport.Open;
InitBuffers; // we need to get the buffers to match FTransport substreams again
end;
function TBufferedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
begin
if FInputBuffer <> nil
then Result := FInputBuffer.Read( pBuf,buflen, off, len )
else Result := 0;
end;
procedure TBufferedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
begin
if FOutputBuffer <> nil then begin
FOutputBuffer.Write( pBuf, off, len );
end;
end;
procedure TBufferedTransportImpl.CheckReadBytesAvailable( const value : Int64);
var buffered, need : Int64;
begin
need := value;
// buffered bytes
buffered := FInputBuffer.Size - FInputBuffer.Position;
if buffered < need
then InnerTransport.CheckReadBytesAvailable( need - buffered);
end;
{ TBufferedTransportImpl.TFactory }
function TBufferedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
begin
Result := TFramedTransportImpl.Create( aTransport);
end;
{ TFramedTransportImpl }
constructor TFramedTransportImpl.Create( const aTransport: ITransport);
begin
ASSERT( aTransport <> nil);
inherited Create( aTransport);
InitWriteBuffer;
end;
destructor TFramedTransportImpl.Destroy;
begin
FWriteBuffer.Free;
FWriteBuffer := nil;
FReadBuffer.Free;
FReadBuffer := nil;
inherited;
end;
procedure TFramedTransportImpl.Close;
begin
InnerTransport.Close;
end;
procedure TFramedTransportImpl.Flush;
var
buf : TBytes;
len : Integer;
data_len : Int64;
begin
if not IsOpen
then raise TTransportExceptionNotOpen.Create('not open');
len := FWriteBuffer.Size;
SetLength( buf, len);
if len > 0 then begin
System.Move( FWriteBuffer.Memory^, buf[0], len );
end;
data_len := len - SizeOf(TFramedHeader);
if (0 > data_len) or (data_len > Configuration.MaxFrameSize)
then raise TTransportExceptionUnknown.Create('TFramedTransport.Flush: invalid frame size ('+IntToStr(data_len)+')')
else UpdateKnownMessageSize( len);
InitWriteBuffer;
buf[0] := Byte($FF and (data_len shr 24));
buf[1] := Byte($FF and (data_len shr 16));
buf[2] := Byte($FF and (data_len shr 8));
buf[3] := Byte($FF and data_len);
InnerTransport.Write( buf, 0, len );
InnerTransport.Flush;
end;
function TFramedTransportImpl.GetIsOpen: Boolean;
begin
Result := InnerTransport.IsOpen;
end;
type
TAccessMemoryStream = class(TMemoryStream)
end;
procedure TFramedTransportImpl.InitWriteBuffer;
const DUMMY_HEADER : TFramedHeader = 0;
begin
FreeAndNil( FWriteBuffer);
FWriteBuffer := TMemoryStream.Create;
TAccessMemoryStream(FWriteBuffer).Capacity := 1024;
FWriteBuffer.Write( DUMMY_HEADER, SizeOf(DUMMY_HEADER));
end;
procedure TFramedTransportImpl.Open;
begin
InnerTransport.Open;
end;
function TFramedTransportImpl.Read( const pBuf : Pointer; const buflen : Integer; off: Integer; len: Integer): Integer;
var pTmp : PByte;
begin
if len > (buflen-off)
then len := buflen-off;
pTmp := pBuf;
Inc( pTmp, off);
if (FReadBuffer <> nil) and (len > 0) then begin
result := FReadBuffer.Read( pTmp^, len);
if result > 0 then Exit;
end;
ReadFrame;
if len > 0
then Result := FReadBuffer.Read( pTmp^, len)
else Result := 0;
end;
procedure TFramedTransportImpl.ReadFrame;
var
i32rd : packed array[0..SizeOf(TFramedHeader)-1] of Byte;
size : Integer;
buff : TBytes;
begin
InnerTransport.ReadAll( @i32rd[0], SizeOf(i32rd), 0, SizeOf(i32rd));
size :=
((i32rd[0] and $FF) shl 24) or
((i32rd[1] and $FF) shl 16) or
((i32rd[2] and $FF) shl 8) or
(i32rd[3] and $FF);
if size < 0 then begin
Close();
raise TTransportExceptionCorruptedData.Create('Read a negative frame size ('+IntToStr(size)+')');
end;
if Int64(size) > Int64(Configuration.MaxFrameSize) then begin
Close();
raise TTransportExceptionCorruptedData.Create('Frame size ('+IntToStr(size)+') larger than allowed maximum ('+IntToStr(Configuration.MaxFrameSize)+')');
end;
UpdateKnownMessageSize(size + SizeOf(size));
SetLength( buff, size );
InnerTransport.ReadAll( buff, 0, size );
FreeAndNil( FReadBuffer);
FReadBuffer := TMemoryStream.Create;
if Length(buff) > 0
then FReadBuffer.Write( Pointer(@buff[0])^, size );
FReadBuffer.Position := 0;
end;
procedure TFramedTransportImpl.Write( const pBuf : Pointer; off, len : Integer);
var pTmp : PByte;
begin
if len > 0 then begin
pTmp := pBuf;
Inc( pTmp, off);
FWriteBuffer.Write( pTmp^, len );
end;
end;
procedure TFramedTransportImpl.CheckReadBytesAvailable( const value : Int64);
var buffered, need : Int64;
begin
need := value;
// buffered bytes
buffered := FReadBuffer.Size - FReadBuffer.Position;
if buffered < need
then InnerTransport.CheckReadBytesAvailable( need - buffered);
end;
{ TFramedTransport.TFactory }
function TFramedTransportImpl.TFactory.GetTransport( const aTransport: ITransport): ITransport;
begin
Result := TFramedTransportImpl.Create( aTransport);
end;
{ TTcpSocketStreamImpl }
procedure TTcpSocketStreamImpl.Close;
begin
FTcpClient.Close;
end;
{$IFDEF OLD_SOCKETS}
constructor TTcpSocketStreamImpl.Create( const aTcpClient: TCustomIpClient; const aTimeout : Integer);
begin
inherited Create;
FTcpClient := aTcpClient;
FTimeout := aTimeout;
end;
{$ELSE}
constructor TTcpSocketStreamImpl.Create( const aTcpClient: TSocket; const aTimeout : Longword);
begin
inherited Create;
FTcpClient := aTcpClient;
if aTimeout = 0 then
FTcpClient.RecvTimeout := SLEEP_TIME
else
FTcpClient.RecvTimeout := aTimeout;
FTcpClient.SendTimeout := aTimeout;
end;
{$ENDIF}
procedure TTcpSocketStreamImpl.Flush;
begin
// nothing to do
end;
function TTcpSocketStreamImpl.IsOpen: Boolean;
begin
{$IFDEF OLD_SOCKETS}
Result := FTcpClient.Active;
{$ELSE}
Result := FTcpClient.IsOpen;
{$ENDIF}
end;
procedure TTcpSocketStreamImpl.Open;
begin
FTcpClient.Open;
end;
{$IFDEF OLD_SOCKETS}
function TTcpSocketStreamImpl.Select( ReadReady, WriteReady, ExceptFlag: PBoolean;
TimeOut: Integer; var wsaError : Integer): Integer;
var
ReadFds: TFDset;
ReadFdsptr: PFDset;
WriteFds: TFDset;
WriteFdsptr: PFDset;
ExceptFds: TFDset;
ExceptFdsptr: PFDset;
tv: timeval;
Timeptr: PTimeval;
socket : TSocket;
begin
if not FTcpClient.Active then begin
wsaError := WSAEINVAL;
Exit( SOCKET_ERROR);
end;
socket := FTcpClient.Handle;
if Assigned(ReadReady) then begin
ReadFdsptr := @ReadFds;
FD_ZERO(ReadFds);
FD_SET(socket, ReadFds);
end
else begin
ReadFdsptr := nil;
end;
if Assigned(WriteReady) then begin
WriteFdsptr := @WriteFds;
FD_ZERO(WriteFds);
FD_SET(socket, WriteFds);
end
else begin
WriteFdsptr := nil;
end;
if Assigned(ExceptFlag) then begin
ExceptFdsptr := @ExceptFds;
FD_ZERO(ExceptFds);
FD_SET(socket, ExceptFds);
end
else begin
ExceptFdsptr := nil;
end;
if TimeOut >= 0 then begin
tv.tv_sec := TimeOut div 1000;
tv.tv_usec := 1000 * (TimeOut mod 1000);
Timeptr := @tv;
end
else begin
Timeptr := nil; // wait forever
end;
wsaError := 0;
try
{$IFDEF MSWINDOWS}
{$IFDEF OLD_UNIT_NAMES}
result := WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
{$ELSE}
result := Winapi.WinSock.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
{$ENDIF}
{$ENDIF}
{$IFDEF LINUX}
result := Libc.select( socket + 1, ReadFdsptr, WriteFdsptr, ExceptFdsptr, Timeptr);
{$ENDIF}
if result = SOCKET_ERROR
then wsaError := WSAGetLastError;
except
result := SOCKET_ERROR;
end;
if Assigned(ReadReady) then
ReadReady^ := FD_ISSET(socket, ReadFds);
if Assigned(WriteReady) then
WriteReady^ := FD_ISSET(socket, WriteFds);
if Assigned(ExceptFlag) then
ExceptFlag^ := FD_ISSET(socket, ExceptFds);
end;
{$ENDIF}
{$IFDEF OLD_SOCKETS}
function TTcpSocketStreamImpl.WaitForData( TimeOut : Integer; pBuf : Pointer;
DesiredBytes : Integer;
var wsaError, bytesReady : Integer): TWaitForData;
var bCanRead, bError : Boolean;
retval : Integer;
const
MSG_PEEK = {$IFDEF OLD_UNIT_NAMES} WinSock.MSG_PEEK {$ELSE} Winapi.WinSock.MSG_PEEK {$ENDIF};
begin
bytesReady := 0;
// The select function returns the total number of socket handles that are ready
// and contained in the fd_set structures, zero if the time limit expired,
// or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
// WSAGetLastError can be used to retrieve a specific error code.
retval := Self.Select( @bCanRead, nil, @bError, TimeOut, wsaError);
if retval = SOCKET_ERROR
then Exit( TWaitForData.wfd_Error);
if (retval = 0) or not bCanRead
then Exit( TWaitForData.wfd_Timeout);
// recv() returns the number of bytes received, or -1 if an error occurred.
// The return value will be 0 when the peer has performed an orderly shutdown.
retval := recv( FTcpClient.Handle, pBuf^, DesiredBytes, MSG_PEEK);
if retval <= 0
then Exit( TWaitForData.wfd_Error);
// at least we have some data
bytesReady := Min( retval, DesiredBytes);
result := TWaitForData.wfd_HaveData;
end;
{$ENDIF}
{$IFDEF OLD_SOCKETS}
function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
// old sockets version
var wfd : TWaitForData;
wsaError,
msecs : Integer;
nBytes : Integer;
pTmp : PByte;
begin
inherited;
if FTimeout > 0
then msecs := FTimeout
else msecs := DEFAULT_THRIFT_TIMEOUT;
result := 0;
pTmp := pBuf;
Inc( pTmp, offset);
while (count > 0) and (result = 0) do begin
while TRUE do begin
wfd := WaitForData( msecs, pTmp, count, wsaError, nBytes);
case wfd of
TWaitForData.wfd_Error : Exit;
TWaitForData.wfd_HaveData : Break;
TWaitForData.wfd_Timeout : begin
if (FTimeout = 0)
then Exit
else raise TTransportExceptionTimedOut.Create(SysErrorMessage(Cardinal(wsaError)));
end;
else
ASSERT( FALSE);
end;
end;
// reduce the timeout once we got data
if FTimeout > 0
then msecs := FTimeout div 10
else msecs := DEFAULT_THRIFT_TIMEOUT div 10;
msecs := Max( msecs, 200);
ASSERT( nBytes <= count);
nBytes := FTcpClient.ReceiveBuf( pTmp^, nBytes);
Inc( pTmp, nBytes);
Dec( count, nBytes);
Inc( result, nBytes);
end;
end;
function TTcpSocketStreamImpl.ToArray: TBytes;
// old sockets version
var len : Integer;
begin
len := 0;
if IsOpen then begin
len := FTcpClient.BytesReceived;
end;
SetLength( Result, len );
if len > 0 then begin
FTcpClient.ReceiveBuf( Pointer(@Result[0])^, len);
end;
end;
procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
// old sockets version
var bCanWrite, bError : Boolean;
retval, wsaError : Integer;
pTmp : PByte;
begin
inherited;
if not FTcpClient.Active
then raise TTransportExceptionNotOpen.Create('not open');
// The select function returns the total number of socket handles that are ready
// and contained in the fd_set structures, zero if the time limit expired,
// or SOCKET_ERROR if an error occurred. If the return value is SOCKET_ERROR,
// WSAGetLastError can be used to retrieve a specific error code.
retval := Self.Select( nil, @bCanWrite, @bError, FTimeOut, wsaError);
if retval = SOCKET_ERROR
then raise TTransportExceptionUnknown.Create(SysErrorMessage(Cardinal(wsaError)));
if (retval = 0)
then raise TTransportExceptionTimedOut.Create('timed out');
if bError or not bCanWrite
then raise TTransportExceptionUnknown.Create('unknown error');
pTmp := pBuf;
Inc( pTmp, offset);
FTcpClient.SendBuf( pTmp^, count);
end;
{$ELSE}
function TTcpSocketStreamImpl.Read( const pBuf : Pointer; const buflen : Integer; offset: Integer; count: Integer): Integer;
// new sockets version
var nBytes : Integer;
pTmp : PByte;
begin
inherited;
result := 0;
pTmp := pBuf;
Inc( pTmp, offset);
while count > 0 do begin
nBytes := FTcpClient.Read( pTmp^, count);
if nBytes = 0 then Exit;
Inc( pTmp, nBytes);
Dec( count, nBytes);
Inc( result, nBytes);
end;
end;
function TTcpSocketStreamImpl.ToArray: TBytes;
// new sockets version
var len : Integer;
begin
len := 0;
try
if FTcpClient.Peek then
repeat
SetLength(Result, Length(Result) + 1024);
len := FTcpClient.Read(Result[Length(Result) - 1024], 1024);
until len < 1024;
except
on TTransportException do begin { don't allow default exceptions } end;
else raise;
end;
if len > 0 then
SetLength(Result, Length(Result) - 1024 + len);
end;
procedure TTcpSocketStreamImpl.Write( const pBuf : Pointer; offset, count: Integer);
// new sockets version
var pTmp : PByte;
begin
inherited;
if not FTcpClient.IsOpen
then raise TTransportExceptionNotOpen.Create('not open');
pTmp := pBuf;
Inc( pTmp, offset);
FTcpClient.Write( pTmp^, count);
end;
{$ENDIF}
end.