blob: 0c9b6bce6af735526306857ccafc735cf568f72f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using Apache.NMS.Stomp.Commands;
using Apache.NMS.Util;
namespace Apache.NMS.Stomp.Transport.Tcp
{
/// <summary>
/// An implementation of ITransport that uses sockets to communicate with the broker
/// </summary>
public class TcpTransport : ITransport
{
protected readonly object myLock = new object();
protected readonly Socket socket;
private IWireFormat wireformat;
private BinaryReader socketReader;
private BinaryWriter socketWriter;
private Thread readThread;
private bool started;
private bool disposed = false;
private readonly Atomic<bool> closed = new Atomic<bool>(false);
private volatile bool seenShutdown;
private readonly Uri connectedUri;
private int timeout = -1;
private int asynctimeout = -1;
private CommandHandler commandHandler;
private ExceptionHandler exceptionHandler;
private InterruptedHandler interruptedHandler;
private ResumedHandler resumedHandler;
private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
public TcpTransport(Uri uri, Socket socket, IWireFormat wireformat)
{
this.connectedUri = uri;
this.socket = socket;
this.wireformat = wireformat;
}
~TcpTransport()
{
Dispose(false);
}
protected virtual Stream CreateSocketStream()
{
return new NetworkStream(socket);
}
/// <summary>
/// Method Start
/// </summary>
public void Start()
{
lock(myLock)
{
if(!started)
{
if(null == commandHandler)
{
throw new InvalidOperationException(
"command cannot be null when Start is called.");
}
if(null == exceptionHandler)
{
throw new InvalidOperationException(
"exception cannot be null when Start is called.");
}
started = true;
// Initialize our Read and Writer instances. Its not actually necessary
// to have two distinct NetworkStream instances but for now the TcpTransport
// will continue to do so for legacy reasons.
socketWriter = new BinaryWriter(CreateSocketStream());
socketReader = new BinaryReader(CreateSocketStream());
// now lets create the background read thread
readThread = new Thread(new ThreadStart(ReadLoop));
readThread.IsBackground = true;
readThread.Start();
}
}
}
/// <summary>
/// Property IsStarted
/// </summary>
public bool IsStarted
{
get
{
lock(myLock)
{
return started;
}
}
}
public void Oneway(Command command)
{
lock(myLock)
{
if(closed.Value)
{
throw new InvalidOperationException("Error writing to broker. Transport connection is closed.");
}
if(command is ShutdownInfo)
{
seenShutdown = true;
}
Wireformat.Marshal(command, socketWriter);
}
}
public FutureResponse AsyncRequest(Command command)
{
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest calls");
}
public bool TcpNoDelayEnabled
{
#if !NETCF
get { return this.socket.NoDelay; }
set { this.socket.NoDelay = value; }
#else
get { return false; }
set { }
#endif
}
public Response Request(Command command)
{
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
}
public Response Request(Command command, TimeSpan timeout)
{
throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request calls");
}
public void Stop()
{
Close();
}
public void Close()
{
if(closed.CompareAndSet(false, true))
{
lock(myLock)
{
try
{
socket.Shutdown(SocketShutdown.Both);
}
catch
{
}
try
{
if(null != socketWriter)
{
socketWriter.Close();
}
}
catch
{
}
finally
{
socketWriter = null;
}
try
{
if(null != socketReader)
{
socketReader.Close();
}
}
catch
{
}
finally
{
socketReader = null;
}
try
{
socket.Close();
}
catch
{
}
if(null != readThread)
{
if(Thread.CurrentThread != readThread
#if !NETCF
&& readThread.IsAlive
#endif
)
{
if(!readThread.Join((int) MAX_THREAD_WAIT.TotalMilliseconds))
{
readThread.Abort();
}
}
readThread = null;
}
started = false;
}
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
Close();
disposed = true;
}
public bool IsDisposed
{
get
{
return disposed;
}
}
public void ReadLoop()
{
// This is the thread function for the reader thread. This runs continuously
// performing a blokcing read on the socket and dispatching all commands
// received.
//
// Exception Handling
// ------------------
// If an Exception occurs during the reading/marshalling, then the connection
// is effectively broken because position cannot be re-established to the next
// message. This is reported to the app via the exceptionHandler and the socket
// is closed to prevent further communication attempts.
//
// An exception in the command handler may not be fatal to the transport, so
// these are simply reported to the exceptionHandler.
//
while(!closed.Value)
{
Command command = null;
try
{
command = (Command) Wireformat.Unmarshal(socketReader);
}
catch(Exception ex)
{
command = null;
if(!closed.Value)
{
// Close the socket as there's little that can be done with this transport now.
Close();
if(!seenShutdown)
{
this.exceptionHandler(this, ex);
}
}
break;
}
try
{
if(command != null)
{
this.commandHandler(this, command);
}
}
catch(Exception e)
{
this.exceptionHandler(this, e);
}
}
}
// Implementation methods
/// <summary>
/// Timeout in milliseconds to wait for sending synchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
public int Timeout
{
get { return this.timeout; }
set { this.timeout = value; }
}
/// <summary>
/// Timeout in milliseconds to wait for sending asynchronous messages or commands.
/// Set to -1 for infinite timeout.
/// </summary>
public int AsyncTimeout
{
get { return this.asynctimeout; }
set { this.asynctimeout = value; }
}
public CommandHandler Command
{
get { return commandHandler; }
set { this.commandHandler = value; }
}
public ExceptionHandler Exception
{
get { return exceptionHandler; }
set { this.exceptionHandler = value; }
}
public InterruptedHandler Interrupted
{
get { return interruptedHandler; }
set { this.interruptedHandler = value; }
}
public ResumedHandler Resumed
{
get { return resumedHandler; }
set { this.resumedHandler = value; }
}
public IWireFormat Wireformat
{
get { return wireformat; }
set { wireformat = value; }
}
public bool IsFaultTolerant
{
get { return false; }
}
public bool IsConnected
{
get { return socket.Connected; }
}
public Uri RemoteAddress
{
get { return connectedUri; }
}
public Object Narrow(Type type)
{
if(this.GetType().Equals(type))
{
return this;
}
return null;
}
}
}