blob: 7dfb3763c146971c28c254ec268294b7d9488fda [file] [log] [blame]
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*)
unit Thrift.Transport.STOMP;
interface
uses
Classes,Windows, SysUtils,
Thrift,
Thrift.Transport,
Thrift.Protocol,
Thrift.Stream,
StompClient,
StompTypes;
type
TStompTransportImpl = class( TStreamTransportImpl)
strict private
FData : TStringStream;
FServer : string;
FOutQueue : string;
FStompCli : IStompClient;
protected
function GetIsOpen: Boolean; override;
function Peek: Boolean; override;
public
constructor Create( const aServerAndPort, aOutQueue : string);
destructor Destroy; override;
procedure Open(); override;
procedure Close(); override;
procedure Flush; override;
end;
TStompServerTransportImpl = class( TServerTransportImpl)
strict private
FServer : string;
FInQueue : string;
FClient : IStompClient;
protected
procedure Listen; override;
procedure Close; override;
function Accept( const fnAccepting: TProc): ITransport; override;
public
constructor Create( const aServerAndPort, aInQueue : string);
destructor Destroy; override;
end;
const
QUEUE_PREFIX = '/queue/';
TOPIC_PREFIX = '/topic/';
EXCHANGE_PREFIX = '/exchange/';
implementation
constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string);
var adapter : IThriftStream;
begin
FData := TStringStream.Create;
FServer := aServerAndPort;
FOutQueue := aOutQueue;
adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE);
inherited Create( nil, adapter); // output only
end;
destructor TStompTransportImpl.Destroy;
begin
inherited Destroy;
FreeAndNil( FData);
FStompCli := nil;
end;
function TStompTransportImpl.GetIsOpen: Boolean;
begin
result := (FStompCli <> nil);
end;
function TStompTransportImpl.Peek: Boolean;
begin
result := FALSE; // output only
end;
procedure TStompTransportImpl.Open;
begin
if FStompCli <> nil
then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open')
else FStompCli := StompUtils.NewStomp( FServer);
end;
procedure TStompTransportImpl.Close;
begin
FStompCli := nil;
FData.Clear;
end;
procedure TStompTransportImpl.Flush;
begin
if FStompCli = nil
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open');
FStompCli.Send( FOutQueue, FData.DataString);
FData.Clear;
end;
//--- TStompServerTransportImpl --------------------------------------------
constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string);
begin
inherited Create;
FServer := aServerAndPort;
FInQueue := aInQueue;
end;
destructor TStompServerTransportImpl.Destroy;
begin
try
Close;
finally
inherited Destroy;
end;
end;
procedure TStompServerTransportImpl.Listen;
begin
FClient := StompUtils.NewStomp(FServer);
FClient.Subscribe( FInQueue);
end;
procedure TStompServerTransportImpl.Close;
begin
if FClient <> nil then begin
FClient.Unsubscribe( FInQueue);
FClient := nil;
end;
end;
function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport;
var frame : IStompFrame;
adapter : IThriftStream;
stream : TStringStream;
begin
if FClient = nil
then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen,
'Not connected.');
if Assigned(fnAccepting)
then fnAccepting();
try
frame := FClient.Receive(MAXINT);
if frame = nil then Exit(nil);
stream := TStringStream.Create( frame.GetBody);
adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE);
result := TStreamTransportImpl.Create( adapter, nil);
except
on E: Exception
do raise TTransportException.Create( E.ToString );
end;
end;
end.