| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using System.Collections; |
| |
| using Apache.NMS.Stomp.Commands; |
| |
| namespace Apache.NMS.Stomp.Transport |
| { |
| /// <summary> |
| /// A Transport that correlates asynchronous send/receive messages into single request/response. |
| /// </summary> |
| public class ResponseCorrelator : TransportFilter |
| { |
| private readonly IDictionary requestMap = Hashtable.Synchronized(new Hashtable()); |
| private int nextCommandId; |
| private Exception error; |
| |
| public ResponseCorrelator(ITransport next) : base(next) |
| { |
| } |
| |
| protected override void OnException(ITransport sender, Exception command) |
| { |
| Dispose(command); |
| base.OnException(sender, command); |
| } |
| |
| internal int GetNextCommandId() |
| { |
| return Interlocked.Increment(ref nextCommandId); |
| } |
| |
| public override void Oneway(Command command) |
| { |
| command.CommandId = GetNextCommandId(); |
| command.ResponseRequired = false; |
| |
| next.Oneway(command); |
| } |
| |
| public override FutureResponse AsyncRequest(Command command) |
| { |
| int commandId = GetNextCommandId(); |
| |
| command.CommandId = commandId; |
| command.ResponseRequired = true; |
| FutureResponse future = new FutureResponse(); |
| Exception priorError = null; |
| lock(requestMap.SyncRoot) |
| { |
| priorError = this.error; |
| if(priorError == null) |
| { |
| requestMap[commandId] = future; |
| } |
| } |
| |
| if(priorError != null) |
| { |
| BrokerError brError = new BrokerError(); |
| brError.Message = priorError.Message; |
| ExceptionResponse response = new ExceptionResponse(); |
| response.Exception = brError; |
| future.Response = response; |
| throw priorError; |
| } |
| |
| next.Oneway(command); |
| |
| return future; |
| } |
| |
| public override Response Request(Command command, TimeSpan timeout) |
| { |
| FutureResponse future = AsyncRequest(command); |
| future.ResponseTimeout = timeout; |
| Response response = future.Response; |
| |
| if(response != null && response is ExceptionResponse) |
| { |
| ExceptionResponse er = response as ExceptionResponse; |
| BrokerError brokerError = er.Exception; |
| |
| if(brokerError == null) |
| { |
| throw new BrokerException(); |
| } |
| else |
| { |
| throw new BrokerException(brokerError); |
| } |
| } |
| |
| return response; |
| } |
| |
| protected override void OnCommand(ITransport sender, Command command) |
| { |
| if(command is Response) |
| { |
| Response response = (Response) command; |
| int correlationId = response.CorrelationId; |
| FutureResponse future = (FutureResponse) requestMap[correlationId]; |
| |
| if(future != null) |
| { |
| requestMap.Remove(correlationId); |
| future.Response = response; |
| |
| if(response is ExceptionResponse) |
| { |
| ExceptionResponse er = response as ExceptionResponse; |
| BrokerError brokerError = er.Exception; |
| BrokerException exception = new BrokerException(brokerError); |
| this.exceptionHandler(this, exception); |
| } |
| } |
| else |
| { |
| if(Tracer.IsDebugEnabled) |
| { |
| Tracer.Debug("Unknown response ID: " + response.CommandId + " for response: " + response); |
| } |
| } |
| } |
| else |
| { |
| this.commandHandler(sender, command); |
| } |
| } |
| |
| public override void Stop() |
| { |
| this.Dispose(new IOException("Stopped")); |
| base.Stop(); |
| } |
| |
| private void Dispose(Exception error) |
| { |
| ArrayList requests = null; |
| |
| lock(requestMap.SyncRoot) |
| { |
| if(this.error == null) |
| { |
| this.error = error; |
| requests = new ArrayList(requestMap.Values); |
| requestMap.Clear(); |
| } |
| } |
| |
| if(requests != null) |
| { |
| foreach(FutureResponse future in requests) |
| { |
| BrokerError brError = new BrokerError(); |
| brError.Message = error.Message; |
| ExceptionResponse response = new ExceptionResponse(); |
| response.Exception = brError; |
| future.Response = response; |
| } |
| } |
| } |
| |
| } |
| } |