blob: a9b9aad461265e1261590bf295a9727aa08719ea [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.fanout;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Transport that fans out a connection to multiple brokers.
*
*
*/
public class FanoutTransport implements CompositeTransport {
private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
private TransportListener transportListener;
private boolean disposed;
private boolean connected;
private final Object reconnectMutex = new Object();
private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
private final TaskRunner reconnectTask;
private boolean started;
private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
private int connectedCount;
private int minAckCount = 2;
private long initialReconnectDelay = 10;
private long maxReconnectDelay = 1000 * 30;
private long backOffMultiplier = 2;
private final boolean useExponentialBackOff = true;
private int maxReconnectAttempts;
private Exception connectionFailure;
private FanoutTransportHandler primary;
private boolean fanOutQueues = false;
static class RequestCounter {
final Command command;
final AtomicInteger ackCount;
RequestCounter(Command command, int count) {
this.command = command;
this.ackCount = new AtomicInteger(count);
}
@Override
public String toString() {
return command.getCommandId() + "=" + ackCount.get();
}
}
class FanoutTransportHandler extends DefaultTransportListener {
private final URI uri;
private Transport transport;
private int connectFailures;
private long reconnectDelay = initialReconnectDelay;
private long reconnectDate;
public FanoutTransportHandler(URI uri) {
this.uri = uri;
}
@Override
public void onCommand(Object o) {
Command command = (Command)o;
if (command.isResponse()) {
Integer id = new Integer(((Response)command).getCorrelationId());
RequestCounter rc = requestMap.get(id);
if (rc != null) {
if (rc.ackCount.decrementAndGet() <= 0) {
requestMap.remove(id);
transportListenerOnCommand(command);
}
} else {
transportListenerOnCommand(command);
}
} else {
transportListenerOnCommand(command);
}
}
@Override
public void onException(IOException error) {
try {
synchronized (reconnectMutex) {
if (transport == null || !transport.isConnected()) {
return;
}
LOG.debug("Transport failed, starting up reconnect task", error);
ServiceSupport.dispose(transport);
transport = null;
connectedCount--;
if (primary == this) {
primary = null;
}
reconnectTask.wakeup();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (transportListener != null) {
transportListener.onException(new InterruptedIOException());
}
}
}
}
public FanoutTransport() throws InterruptedIOException {
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
public boolean iterate() {
return doConnect();
}
}, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
}
/**
* @return
*/
private boolean doConnect() {
long closestReconnectDate = 0;
synchronized (reconnectMutex) {
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
if (transports.size() == connectedCount || disposed || connectionFailure != null) {
return false;
} else {
if (transports.isEmpty()) {
// connectionFailure = new IOException("No uris available to
// connect to.");
} else {
// Try to connect them up.
Iterator<FanoutTransportHandler> iter = transports.iterator();
for (int i = 0; iter.hasNext() && !disposed; i++) {
long now = System.currentTimeMillis();
FanoutTransportHandler fanoutHandler = iter.next();
if (fanoutHandler.transport != null) {
continue;
}
// Are we waiting a little to try to reconnect this one?
if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
closestReconnectDate = fanoutHandler.reconnectDate;
}
continue;
}
URI uri = fanoutHandler.uri;
try {
LOG.debug("Stopped: " + this);
LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
fanoutHandler.transport = t;
t.setTransportListener(fanoutHandler);
if (started) {
restoreTransport(fanoutHandler);
}
LOG.debug("Connection established");
fanoutHandler.reconnectDelay = initialReconnectDelay;
fanoutHandler.connectFailures = 0;
if (primary == null) {
primary = fanoutHandler;
}
connectedCount++;
} catch (Exception e) {
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
if( fanoutHandler.transport !=null ) {
ServiceSupport.dispose(fanoutHandler.transport);
fanoutHandler.transport=null;
}
if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
connectionFailure = e;
reconnectMutex.notifyAll();
return false;
} else {
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
fanoutHandler.reconnectDelay *= backOffMultiplier;
if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
fanoutHandler.reconnectDelay = maxReconnectDelay;
}
}
fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
closestReconnectDate = fanoutHandler.reconnectDate;
}
}
}
}
if (transports.size() == connectedCount || disposed) {
reconnectMutex.notifyAll();
return false;
}
}
}
}
try {
long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
if (reconnectDelay > 0) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
Thread.sleep(reconnectDelay);
}
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return true;
}
public void start() throws Exception {
synchronized (reconnectMutex) {
LOG.debug("Started.");
if (started) {
return;
}
started = true;
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
if (th.transport != null) {
restoreTransport(th);
}
}
connected=true;
}
}
public void stop() throws Exception {
synchronized (reconnectMutex) {
ServiceStopper ss = new ServiceStopper();
if (!started) {
return;
}
started = false;
disposed = true;
connected=false;
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
if (th.transport != null) {
ss.stop(th.transport);
}
}
LOG.debug("Stopped: " + this);
ss.throwFirstException();
}
reconnectTask.shutdown();
}
public int getMinAckCount() {
return minAckCount;
}
public void setMinAckCount(int minAckCount) {
this.minAckCount = minAckCount;
}
public long getInitialReconnectDelay() {
return initialReconnectDelay;
}
public void setInitialReconnectDelay(long initialReconnectDelay) {
this.initialReconnectDelay = initialReconnectDelay;
}
public long getMaxReconnectDelay() {
return maxReconnectDelay;
}
public void setMaxReconnectDelay(long maxReconnectDelay) {
this.maxReconnectDelay = maxReconnectDelay;
}
public long getReconnectDelayExponent() {
return backOffMultiplier;
}
public void setReconnectDelayExponent(long reconnectDelayExponent) {
this.backOffMultiplier = reconnectDelayExponent;
}
public int getMaxReconnectAttempts() {
return maxReconnectAttempts;
}
public void setMaxReconnectAttempts(int maxReconnectAttempts) {
this.maxReconnectAttempts = maxReconnectAttempts;
}
public void oneway(Object o) throws IOException {
final Command command = (Command)o;
try {
synchronized (reconnectMutex) {
// Wait for transport to be connected.
while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
reconnectMutex.wait(1000);
}
// Still not fully connected.
if (connectedCount < minAckCount) {
Exception error;
// Throw the right kind of error..
if (disposed) {
error = new IOException("Transport disposed.");
} else if (connectionFailure != null) {
error = connectionFailure;
} else {
error = new IOException("Unexpected failure.");
}
if (error instanceof IOException) {
throw (IOException)error;
}
throw IOExceptionSupport.create(error);
}
// If it was a request and it was not being tracked by
// the state tracker,
// then hold it in the requestMap so that we can replay
// it later.
boolean fanout = isFanoutCommand(command);
if (stateTracker.track(command) == null && command.isResponseRequired()) {
int size = fanout ? minAckCount : 1;
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
}
// Send the message.
if (fanout) {
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
if (th.transport != null) {
try {
th.transport.oneway(command);
} catch (IOException e) {
LOG.debug("Send attempt: failed.");
th.onException(e);
}
}
}
} else {
try {
primary.transport.oneway(command);
} catch (IOException e) {
LOG.debug("Send attempt: failed.");
primary.onException(e);
}
}
}
} catch (InterruptedException e) {
// Some one may be trying to stop our thread.
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
}
/**
* @param command
* @return
*/
private boolean isFanoutCommand(Command command) {
if (command.isMessage()) {
if( fanOutQueues ) {
return true;
}
return ((Message)command).getDestination().isTopic();
}
if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
return false;
}
return true;
}
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Object request(Object command, int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
public void reconnect() {
LOG.debug("Waking up reconnect task");
try {
reconnectTask.wakeup();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public TransportListener getTransportListener() {
return transportListener;
}
public void setTransportListener(TransportListener commandListener) {
this.transportListener = commandListener;
}
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
}
synchronized (reconnectMutex) {
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
if (th.transport != null) {
T rc = th.transport.narrow(target);
if (rc != null) {
return rc;
}
}
}
}
return null;
}
protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
th.transport.start();
stateTracker.setRestoreConsumers(th.transport == primary);
stateTracker.restore(th.transport);
for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
RequestCounter rc = iter2.next();
th.transport.oneway(rc.command);
}
}
public void add(boolean reblance,URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
URI uri = uris[i];
boolean match = false;
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
if (th.uri.equals(uri)) {
match = true;
break;
}
}
if (!match) {
FanoutTransportHandler th = new FanoutTransportHandler(uri);
transports.add(th);
reconnect();
}
}
}
}
public void remove(boolean rebalance,URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
URI uri = uris[i];
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
if (th.uri.equals(uri)) {
if (th.transport != null) {
ServiceSupport.dispose(th.transport);
connectedCount--;
}
iter.remove();
break;
}
}
}
}
}
public void reconnect(URI uri) throws IOException {
add(true,new URI[]{uri});
}
public boolean isReconnectSupported() {
return true;
}
public boolean isUpdateURIsSupported() {
return true;
}
public void updateURIs(boolean reblance,URI[] uris) throws IOException {
add(reblance,uris);
}
public String getRemoteAddress() {
if (primary != null) {
if (primary.transport != null) {
return primary.transport.getRemoteAddress();
}
}
return null;
}
protected void transportListenerOnCommand(Command command) {
if (transportListener != null) {
transportListener.onCommand(command);
}
}
public boolean isFaultTolerant() {
return true;
}
public boolean isFanOutQueues() {
return fanOutQueues;
}
public void setFanOutQueues(boolean fanOutQueues) {
this.fanOutQueues = fanOutQueues;
}
public boolean isDisposed() {
return disposed;
}
public boolean isConnected() {
return connected;
}
public int getReceiveCounter() {
int rc = 0;
synchronized (reconnectMutex) {
for (FanoutTransportHandler th : transports) {
if (th.transport != null) {
rc += th.transport.getReceiveCounter();
}
}
}
return rc;
}
}