| (* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| *) |
| |
| unit Thrift.Transport.STOMP; |
| |
| interface |
| |
| uses |
| Classes,Windows, SysUtils, |
| Thrift, |
| Thrift.Transport, |
| Thrift.Protocol, |
| Thrift.Stream, |
| StompClient, |
| StompTypes; |
| |
| type |
| TStompTransportImpl = class( TStreamTransportImpl) |
| strict private |
| FData : TStringStream; |
| FServer : string; |
| FOutQueue : string; |
| FStompCli : IStompClient; |
| protected |
| function GetIsOpen: Boolean; override; |
| function Peek: Boolean; override; |
| public |
| constructor Create( const aServerAndPort, aOutQueue : string); |
| destructor Destroy; override; |
| |
| procedure Open(); override; |
| procedure Close(); override; |
| procedure Flush; override; |
| end; |
| |
| |
| TStompServerTransportImpl = class( TServerTransportImpl) |
| strict private |
| FServer : string; |
| FInQueue : string; |
| FClient : IStompClient; |
| protected |
| procedure Listen; override; |
| procedure Close; override; |
| function Accept( const fnAccepting: TProc): ITransport; override; |
| public |
| constructor Create( const aServerAndPort, aInQueue : string); |
| destructor Destroy; override; |
| end; |
| |
| |
| const |
| QUEUE_PREFIX = '/queue/'; |
| TOPIC_PREFIX = '/topic/'; |
| EXCHANGE_PREFIX = '/exchange/'; |
| |
| |
| implementation |
| |
| |
| |
| constructor TStompTransportImpl.Create( const aServerAndPort, aOutQueue : string); |
| var adapter : IThriftStream; |
| begin |
| FData := TStringStream.Create; |
| FServer := aServerAndPort; |
| FOutQueue := aOutQueue; |
| |
| adapter := TThriftStreamAdapterDelphi.Create( FData, FALSE); |
| inherited Create( nil, adapter); // output only |
| end; |
| |
| |
| destructor TStompTransportImpl.Destroy; |
| begin |
| inherited Destroy; |
| FreeAndNil( FData); |
| FStompCli := nil; |
| end; |
| |
| |
| function TStompTransportImpl.GetIsOpen: Boolean; |
| begin |
| result := (FStompCli <> nil); |
| end; |
| |
| |
| function TStompTransportImpl.Peek: Boolean; |
| begin |
| result := FALSE; // output only |
| end; |
| |
| |
| procedure TStompTransportImpl.Open; |
| begin |
| if FStompCli <> nil |
| then raise TTransportException.Create( TTransportException.TExceptionType.AlreadyOpen, 'already open') |
| else FStompCli := StompUtils.NewStomp( FServer); |
| end; |
| |
| |
| procedure TStompTransportImpl.Close; |
| begin |
| FStompCli := nil; |
| FData.Clear; |
| end; |
| |
| |
| procedure TStompTransportImpl.Flush; |
| begin |
| if FStompCli = nil |
| then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, 'not open'); |
| |
| FStompCli.Send( FOutQueue, FData.DataString); |
| FData.Clear; |
| end; |
| |
| |
| //--- TStompServerTransportImpl -------------------------------------------- |
| |
| |
| constructor TStompServerTransportImpl.Create( const aServerAndPort, aInQueue : string); |
| begin |
| inherited Create; |
| FServer := aServerAndPort; |
| FInQueue := aInQueue; |
| end; |
| |
| |
| destructor TStompServerTransportImpl.Destroy; |
| begin |
| try |
| Close; |
| finally |
| inherited Destroy; |
| end; |
| end; |
| |
| |
| procedure TStompServerTransportImpl.Listen; |
| begin |
| FClient := StompUtils.NewStomp(FServer); |
| FClient.Subscribe( FInQueue); |
| end; |
| |
| |
| procedure TStompServerTransportImpl.Close; |
| begin |
| if FClient <> nil then begin |
| FClient.Unsubscribe( FInQueue); |
| FClient := nil; |
| end; |
| end; |
| |
| |
| function TStompServerTransportImpl.Accept( const fnAccepting: TProc): ITransport; |
| var frame : IStompFrame; |
| adapter : IThriftStream; |
| stream : TStringStream; |
| begin |
| if FClient = nil |
| then raise TTransportException.Create( TTransportException.TExceptionType.NotOpen, |
| 'Not connected.'); |
| |
| if Assigned(fnAccepting) |
| then fnAccepting(); |
| |
| try |
| frame := FClient.Receive(MAXINT); |
| if frame = nil then Exit(nil); |
| |
| stream := TStringStream.Create( frame.GetBody); |
| adapter := TThriftStreamAdapterDelphi.Create( stream, TRUE); |
| result := TStreamTransportImpl.Create( adapter, nil); |
| |
| except |
| on E: Exception |
| do raise TTransportException.Create( E.ToString ); |
| end; |
| end; |
| |
| |
| end. |
| |