blob: 4100c802323f1c6822ed51ddc775b04cad1b09b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ResponseCorrelator.h"
#include <algorithm>
#include <decaf/util/ArrayList.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/HashMap.h>
#include <activemq/commands/Response.h>
#include <activemq/commands/ExceptionResponse.h>
#include <activemq/transport/FutureResponse.h>
using namespace std;
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::transport::correlator;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
namespace {
class ResponseFinalizer {
private:
ResponseFinalizer(const ResponseFinalizer&);
ResponseFinalizer operator=(const ResponseFinalizer&);
private:
Mutex* mutex;
int commandId;
HashMap<unsigned int, Pointer<FutureResponse> >* map;
public:
ResponseFinalizer(Mutex* mutex, int commandId, HashMap<unsigned int, Pointer<FutureResponse> >* map) :
mutex(mutex), commandId(commandId), map(map) {
}
~ResponseFinalizer() {
synchronized(mutex){
try {
map->remove(commandId);
} catch (...) {}
}
}
};
}
////////////////////////////////////////////////////////////////////////////////
namespace activemq{
namespace transport{
namespace correlator{
class CorrelatorData {
public:
// The next command id for sent commands.
decaf::util::concurrent::atomic::AtomicInteger nextCommandId;
// Map of request ids to future response objects.
HashMap<unsigned int, Pointer<FutureResponse> > requestMap;
// Sync object for accessing the request map.
decaf::util::concurrent::Mutex mapMutex;
// Indicates that an the filter is now unusable from some error.
Pointer<Exception> priorError;
public:
CorrelatorData() : nextCommandId(1), requestMap(), mapMutex(), priorError(NULL) {}
};
}}}
////////////////////////////////////////////////////////////////////////////////
ResponseCorrelator::ResponseCorrelator(Pointer<Transport> next) : TransportFilter(next), impl(new CorrelatorData) {
}
////////////////////////////////////////////////////////////////////////////////
ResponseCorrelator::~ResponseCorrelator() {
// Close the transport and destroy it.
try {
close();
}
AMQ_CATCHALL_NOTHROW()
delete this->impl;
}
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelator::oneway(const Pointer<Command> command) {
try {
checkClosed();
command->setCommandId(this->impl->nextCommandId.getAndIncrement());
command->setResponseRequired(false);
next->oneway(command);
}
AMQ_CATCH_RETHROW(UnsupportedOperationException)
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
Pointer<FutureResponse> ResponseCorrelator::asyncRequest(const Pointer<Command> command, const Pointer<ResponseCallback> responseCallback) {
try {
checkClosed();
command->setCommandId(this->impl->nextCommandId.getAndIncrement());
command->setResponseRequired(true);
// Add a future response object to the map indexed by this command id.
Pointer<FutureResponse> futureResponse(new FutureResponse(responseCallback));
Pointer<Exception> priorError;
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse);
}
}
if (priorError != NULL) {
Pointer<commands::BrokerError> exception(new commands::BrokerError(priorError));
Pointer<commands::ExceptionResponse> response(new commands::ExceptionResponse);
response->setException(exception);
futureResponse->setResponse(response);
throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
}
// Send the request.
try {
next->oneway(command);
} catch (Exception &ex) {
// We have to ensure this gets cleaned out otherwise we can consume memory over time.
this->impl->requestMap.remove(command->getCommandId());
throw;
}
return futureResponse;
}
AMQ_CATCH_RETHROW(UnsupportedOperationException)
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Response> ResponseCorrelator::request(const Pointer<Command> command) {
try {
checkClosed();
command->setCommandId(this->impl->nextCommandId.getAndIncrement());
command->setResponseRequired(true);
// Add a future response object to the map indexed by this command id.
Pointer<FutureResponse> futureResponse(new FutureResponse());
Pointer<Exception> priorError;
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse);
}
}
if (priorError != NULL) {
throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
}
// The finalizer will cleanup the map even if an exception is thrown.
ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
// Wait to be notified of the response via the futureResponse object.
Pointer<commands::Response> response;
// Send the request.
next->oneway(command);
// Get the response.
response = futureResponse->getResponse();
if (response == NULL) {
throw IOException(__FILE__, __LINE__,
"No valid response received for command: %s, check broker.", command->toString().c_str());
}
return response;
}
AMQ_CATCH_RETHROW(UnsupportedOperationException)
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Response> ResponseCorrelator::request(const Pointer<Command> command, unsigned int timeout) {
try {
checkClosed();
command->setCommandId(this->impl->nextCommandId.getAndIncrement());
command->setResponseRequired(true);
// Add a future response object to the map indexed by this command id.
Pointer<FutureResponse> futureResponse(new FutureResponse());
Pointer<Exception> priorError;
synchronized(&this->impl->mapMutex) {
priorError = this->impl->priorError;
if (priorError == NULL) {
this->impl->requestMap.put((unsigned int) command->getCommandId(), futureResponse);
}
}
if (priorError != NULL) {
throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
}
// The finalizer will cleanup the map even if an exception is thrown.
ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
// Wait to be notified of the response via the futureResponse object.
Pointer<commands::Response> response;
// Send the request.
next->oneway(command);
// Get the response.
response = futureResponse->getResponse(timeout);
if (response == NULL) {
throw IOException(__FILE__, __LINE__,
"No valid response received for command: %s, check broker.", command->toString().c_str());
}
return response;
}
AMQ_CATCH_RETHROW(UnsupportedOperationException)
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelator::onCommand(const Pointer<Command> command) {
// Let's see if the incoming command is a response, if not we just pass it along
// and allow outstanding requests to keep waiting without stalling control commands.
if (!command->isResponse()) {
TransportFilter::onCommand(command);
return;
}
Pointer<Response> response = command.dynamicCast<Response>();
// It is a response - let's correlate ...
synchronized(&this->impl->mapMutex) {
Pointer<FutureResponse> futureResponse;
try {
futureResponse = this->impl->requestMap.remove(response->getCorrelationId());
} catch (NoSuchElementException& ex) {
return;
}
// Set the response property in the future response.
futureResponse->setResponse(response);
}
}
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelator::doClose() {
try {
dispose(Pointer<Exception>(new IOException(__FILE__, __LINE__, "Transport Stopped")));
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelator::onException(const decaf::lang::Exception& ex) {
dispose(Pointer<Exception>(ex.clone()));
TransportFilter::onException(ex);
}
////////////////////////////////////////////////////////////////////////////////
void ResponseCorrelator::dispose(Pointer<Exception> error) {
ArrayList<Pointer<FutureResponse> > requests;
synchronized(&this->impl->mapMutex) {
if (this->impl->priorError == NULL) {
this->impl->priorError = error;
requests.ensureCapacity((int)this->impl->requestMap.size());
requests.copy(this->impl->requestMap.values());
this->impl->requestMap.clear();
}
}
if (!requests.isEmpty()) {
Pointer<commands::BrokerError> exception(new commands::BrokerError);
exception->setExceptionClass("java.io.IOException");
exception->setMessage(error->getMessage());
Pointer<commands::ExceptionResponse> errorResponse(new commands::ExceptionResponse);
errorResponse->setException(exception);
Pointer<Iterator<Pointer<FutureResponse> > > iter(requests.iterator());
while (iter->hasNext()) {
Pointer<FutureResponse> response = iter->next();
response->setResponse(errorResponse);
}
}
}