blob: 2899311973e1f80143a8d6cf4881496d916129bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TcpTransport.h"
#include <activemq/transport/IOTransport.h>
#include <activemq/transport/TransportFactory.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <decaf/net/SocketFactory.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <memory>
using namespace std;
using namespace activemq;
using namespace activemq::io;
using namespace activemq::transport;
using namespace activemq::transport::tcp;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::net;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
using namespace decaf::io;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
namespace activemq {
namespace transport {
namespace tcp {
class TcpTransportImpl {
private:
TcpTransportImpl(const TcpTransportImpl&);
TcpTransportImpl& operator= (const TcpTransportImpl&);
public:
int connectTimeout;
std::auto_ptr<decaf::net::Socket> socket;
std::auto_ptr<decaf::io::DataInputStream> dataInputStream;
std::auto_ptr<decaf::io::DataOutputStream> dataOutputStream;
const decaf::net::URI& location;
int outputBufferSize;
int inputBufferSize;
bool trace;
int soLinger;
bool soKeepAlive;
int soReceiveBufferSize;
int soSendBufferSize;
bool tcpNoDelay;
TcpTransportImpl(const decaf::net::URI& location) :
connectTimeout(0),
socket(),
dataInputStream(),
dataOutputStream(),
location(location),
outputBufferSize(8192),
inputBufferSize(8192),
trace(false),
soLinger(-1),
soKeepAlive(false),
soReceiveBufferSize(-1),
soSendBufferSize(-1),
tcpNoDelay(true) {
}
};
}}}
////////////////////////////////////////////////////////////////////////////////
TcpTransport::TcpTransport(const Pointer<Transport> next, const decaf::net::URI& location) :
TransportFilter(next), impl(new TcpTransportImpl(location)) {
}
////////////////////////////////////////////////////////////////////////////////
TcpTransport::~TcpTransport() {
try {
close();
}
AMQ_CATCHALL_NOTHROW()
try {
delete this->impl;
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::beforeNextIsStarted() {
try {
connect();
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::afterNextIsStopped() {
try {
// The IOTransport is now stopped, so we can safely closed the socket
// and no asynchronous exceptions should be triggered.
if (impl->socket.get() != NULL) {
impl->socket->close();
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::doClose() {
try {
if (impl->socket.get() != NULL) {
impl->socket->close();
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::connect() {
try {
impl->socket.reset(this->createSocket());
// Set all Socket Options from the URI options.
this->configureSocket(impl->socket.get());
URI uri = this->impl->location;
// Ensure something is actually passed in for the URI
if (uri.getAuthority() == "") {
throw SocketException(__FILE__, __LINE__,
"Connection URI was not provided or is invalid: %s", uri.toString().c_str());
}
// Connect the socket.
string host = uri.getHost();
int port = uri.getPort();
impl->socket->connect(host, port, impl->connectTimeout);
// Cast it to an IO transport so we can wire up the socket
// input and output streams.
IOTransport* ioTransport = dynamic_cast<IOTransport*>(next.get());
if (ioTransport == NULL) {
throw ActiveMQException(__FILE__, __LINE__, "TcpTransport::TcpTransport - "
"transport must be of type IOTransport");
}
// Get the read buffer size.
int inputBufferSize = this->impl->inputBufferSize;
// Get the write buffer size.
int outputBufferSize = this->impl->outputBufferSize;
// We don't own these ever, socket object owns.
InputStream* socketIStream = impl->socket->getInputStream();
OutputStream* sokcetOStream = impl->socket->getOutputStream();
Pointer<InputStream> inputStream;
Pointer<OutputStream> outputStream;
// If tcp tracing was enabled, wrap the input / output streams with logging streams
if (this->impl->trace) {
// Wrap with logging stream, we don't own the wrapped streams
inputStream.reset(new LoggingInputStream(socketIStream));
outputStream.reset(new LoggingOutputStream(sokcetOStream));
// Now wrap with the Buffered streams, we own the source streams
inputStream.reset(new BufferedInputStream(inputStream.release(), inputBufferSize, true));
outputStream.reset(new BufferedOutputStream(outputStream.release(), outputBufferSize, true));
} else {
// Wrap with the Buffered streams, we don't own the source streams
inputStream.reset(new BufferedInputStream(socketIStream, inputBufferSize));
outputStream.reset(new BufferedOutputStream(sokcetOStream, outputBufferSize));
}
// Now wrap the Buffered Streams with DataInput based streams. We own
// the Source streams, all the streams in the chain that we own are
// destroyed when these are.
this->impl->dataInputStream.reset(new DataInputStream(inputStream.release(), true));
this->impl->dataOutputStream.reset(new DataOutputStream(outputStream.release(), true));
// Give the IOTransport the streams.
ioTransport->setInputStream(impl->dataInputStream.get());
ioTransport->setOutputStream(impl->dataOutputStream.get());
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
Socket* TcpTransport::createSocket() {
try {
SocketFactory* factory = SocketFactory::getDefault();
return factory->createSocket();
}
DECAF_CATCH_RETHROW(IOException)
DECAF_CATCH_EXCEPTION_CONVERT(Exception, IOException)
DECAF_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::configureSocket(Socket* socket) {
try {
int soLinger = this->impl->soLinger;
int soReceiveBufferSize = this->impl->soReceiveBufferSize;
int soSendBufferSize = this->impl->soSendBufferSize;
// Set the socket options.
socket->setKeepAlive(this->impl->soKeepAlive);
socket->setTcpNoDelay(this->impl->tcpNoDelay);
if (soLinger > 0) {
socket->setSoLinger(true, soLinger);
}
if (soReceiveBufferSize > 0) {
socket->setReceiveBufferSize(soReceiveBufferSize);
}
if (soSendBufferSize > 0) {
socket->setSendBufferSize(soSendBufferSize);
}
}
DECAF_CATCH_RETHROW(NullPointerException)
DECAF_CATCH_RETHROW(IllegalArgumentException)
DECAF_CATCH_RETHROW(SocketException)
DECAF_CATCH_EXCEPTION_CONVERT(Exception, SocketException)
DECAF_CATCHALL_THROW(SocketException)
}
////////////////////////////////////////////////////////////////////////////////
bool TcpTransport::isConnected() const {
if (this->impl->socket.get() != NULL) {
return this->impl->socket->isConnected();
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setConnectTimeout(int soConnectTimeout) {
this->impl->connectTimeout = soConnectTimeout;
}
////////////////////////////////////////////////////////////////////////////////
int TcpTransport::getConnectTimeout() const {
return this->impl->connectTimeout;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setOutputBufferSize(int outputBufferSize) {
this->impl->outputBufferSize = outputBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
int TcpTransport::getOutputBufferSize() const {
return this->impl->outputBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setInputBufferSize(int inputBufferSize) {
this->impl->inputBufferSize = inputBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
int TcpTransport::getInputBufferSize() const {
return this->impl->inputBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setTrace(bool trace) {
this->impl->trace = trace;
}
////////////////////////////////////////////////////////////////////////////////
bool TcpTransport::isTrace() const {
return this->impl->trace;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setLinger(int soLinger) {
this->impl->soLinger = soLinger;
}
////////////////////////////////////////////////////////////////////////////////
int TcpTransport::getLinger() const {
return this->impl->soLinger;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setKeepAlive(bool soKeepAlive) {
this->impl->soKeepAlive = soKeepAlive;
}
////////////////////////////////////////////////////////////////////////////////
bool TcpTransport::isKeepAlive() const {
return this->impl->soKeepAlive;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setReceiveBufferSize(int soReceiveBufferSize) {
this->impl->soReceiveBufferSize = soReceiveBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
int TcpTransport::getReceiveBufferSize() const {
return this->impl->soReceiveBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setSendBufferSize(int soSendBufferSize) {
this->impl->soSendBufferSize = soSendBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
int TcpTransport::getSendBufferSize() const {
return this->impl->soSendBufferSize;
}
////////////////////////////////////////////////////////////////////////////////
void TcpTransport::setTcpNoDelay(bool tcpNoDelay) {
this->impl->tcpNoDelay = tcpNoDelay;
}
////////////////////////////////////////////////////////////////////////////////
bool TcpTransport::isTcpNoDelay() const {
return this->impl->tcpNoDelay;
}
////////////////////////////////////////////////////////////////////////////////
decaf::net::URI TcpTransport::getLocation() const {
return this->impl->location;
}