blob: 8e58d6d51e0f39658f819c44b26f1ddb65154335 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TransportFilter.h"
#include <decaf/io/IOException.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <activemq/wireformat/WireFormat.h>
using namespace activemq;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
using namespace decaf::io;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace transport {
class TransportFilterImpl {
TransportFilterImpl(const TransportFilterImpl&);
TransportFilterImpl& operator= (const TransportFilterImpl&);
public:
AtomicBoolean closed;
AtomicBoolean started;
TransportFilterImpl() : closed(), started() {
}
};
}}
////////////////////////////////////////////////////////////////////////////////
TransportFilter::TransportFilter(const Pointer<Transport> next) :
impl(new TransportFilterImpl()), next(next), listener(NULL) {
// Observe the nested transport for events.
next->setTransportListener(this);
}
////////////////////////////////////////////////////////////////////////////////
TransportFilter::~TransportFilter() {
try {
close();
}
AMQ_CATCHALL_NOTHROW()
try {
// Force next out here so we can ensure we catch any stray
// exceptions. Since we hold the only reference to next it
// should get deleted.
this->next.reset(NULL);
}
AMQ_CATCHALL_NOTHROW()
try {
delete this->impl;
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::onCommand(const Pointer<Command> command) {
if (!this->impl->started.get() || this->impl->closed.get()) {
return;
}
try {
if (this->listener != NULL) {
this->listener->onCommand(command);
}
} catch (...) {
}
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::onException(const decaf::lang::Exception& ex) {
if (!this->impl->started.get() || this->impl->closed.get()) {
return;
}
if (this->listener != NULL) {
try {
this->listener->onException(ex);
} catch (...) {
}
}
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::transportInterrupted() {
if (!this->impl->started.get() || this->impl->closed.get()) {
return;
}
try {
if (this->listener != NULL) {
this->listener->transportInterrupted();
}
} catch (...) {
}
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::transportResumed() {
if (!this->impl->started.get() || this->impl->closed.get()) {
return;
}
try {
if (this->listener != NULL) {
this->listener->transportResumed();
}
} catch (...) {
}
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::start() {
if (this->impl->closed.get()) {
return;
}
if (this->listener == NULL) {
throw decaf::io::IOException(__FILE__, __LINE__, "exceptionListener is invalid");
}
if (this->next == NULL) {
throw decaf::io::IOException(__FILE__, __LINE__, "Transport chain is invalid");
}
try {
if (this->impl->started.compareAndSet(false, true)) {
beforeNextIsStarted();
next->start();
afterNextIsStarted();
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::stop() {
if (this->impl->closed.get()) {
return;
}
try {
if (this->impl->started.compareAndSet(true, false)) {
if (this->next == NULL) {
throw decaf::io::IOException(__FILE__, __LINE__, "Transport chain is invalid");
}
IOException error;
bool hasException = false;
try {
beforeNextIsStopped();
} catch (IOException& ex) {
error = ex;
error.setMark(__FILE__, __LINE__);
hasException = true;
}
try {
next->stop();
} catch (IOException& ex) {
error = ex;
error.setMark(__FILE__, __LINE__);
hasException = true;
}
try {
afterNextIsStopped();
} catch (IOException& ex) {
error = ex;
error.setMark(__FILE__, __LINE__);
hasException = true;
}
if (hasException) {
throw error;
}
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::close() {
if (this->impl->closed.get()) {
return;
}
try {
IOException error;
bool hasException = false;
try {
stop();
} catch (IOException& ex) {
error = ex;
error.setMark(__FILE__, __LINE__);
hasException = true;
}
if (this->impl->closed.compareAndSet(false, true)) {
if (this->next == NULL) {
throw decaf::io::IOException(__FILE__, __LINE__, "Transport chain is invalid");
}
next->setTransportListener(NULL);
try {
next->close();
} catch (IOException& ex) {
error = ex;
error.setMark(__FILE__, __LINE__);
hasException = true;
}
try {
doClose();
} catch (IOException& ex) {
error = ex;
error.setMark(__FILE__, __LINE__);
hasException = true;
}
}
if (hasException) {
throw error;
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
Transport* TransportFilter::narrow(const std::type_info& typeId) {
if (typeid(*this) == typeId) {
return this;
} else if (this->next != NULL) {
return this->next->narrow(typeId);
}
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::reconnect(const decaf::net::URI& uri) {
checkClosed();
try {
next->reconnect(uri);
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
Pointer<wireformat::WireFormat> TransportFilter::getWireFormat() const {
checkClosed();
return next->getWireFormat();
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::setWireFormat(const Pointer<wireformat::WireFormat> wireFormat) {
checkClosed();
next->setWireFormat(wireFormat);
}
////////////////////////////////////////////////////////////////////////////////
bool TransportFilter::isClosed() const {
return this->impl->closed.get();
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::checkClosed() const {
if (this->impl->closed.get()) {
throw IOException(__FILE__, __LINE__, "Transport is closed");
}
}