blob: ddc020b575ca4c1f2e49df41226b2c63a7c86d9b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*/
public class TransportConnector implements Connector, BrokerServiceAware {
final Logger LOG = LoggerFactory.getLogger(TransportConnector.class);
protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
protected TransportStatusDetector statusDector;
private BrokerService brokerService;
private TransportServer server;
private URI uri;
private BrokerInfo brokerInfo = new BrokerInfo();
private TaskRunnerFactory taskRunnerFactory;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private DiscoveryAgent discoveryAgent;
private final ConnectorStatistics statistics = new ConnectorStatistics();
private URI discoveryUri;
private String name;
private boolean disableAsyncDispatch;
private boolean enableStatusMonitor = false;
private Broker broker;
private boolean updateClusterClients = false;
private boolean rebalanceClusterClients;
private boolean updateClusterClientsOnRemove = false;
private String updateClusterFilter;
private boolean auditNetworkProducers = false;
private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE;
private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE;
private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy();
private boolean warnOnRemoteClose = false;
LinkedList<String> peerBrokers = new LinkedList<String>();
public TransportConnector() {
}
public TransportConnector(TransportServer server) {
this();
setServer(server);
if (server != null && server.getConnectURI() != null) {
URI uri = server.getConnectURI();
if (uri != null && uri.getScheme().equals("vm")) {
setEnableStatusMonitor(false);
}
}
}
/**
* @return Returns the connections.
*/
public CopyOnWriteArrayList<TransportConnection> getConnections() {
return connections;
}
/**
* Factory method to create a JMX managed version of this transport
* connector
*/
public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException {
ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer());
rc.setBrokerInfo(getBrokerInfo());
rc.setDisableAsyncDispatch(isDisableAsyncDispatch());
rc.setDiscoveryAgent(getDiscoveryAgent());
rc.setDiscoveryUri(getDiscoveryUri());
rc.setEnableStatusMonitor(isEnableStatusMonitor());
rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
rc.setName(getName());
rc.setTaskRunnerFactory(getTaskRunnerFactory());
rc.setUri(getUri());
rc.setBrokerService(brokerService);
rc.setUpdateClusterClients(isUpdateClusterClients());
rc.setRebalanceClusterClients(isRebalanceClusterClients());
rc.setUpdateClusterFilter(getUpdateClusterFilter());
rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove());
rc.setAuditNetworkProducers(isAuditNetworkProducers());
rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection());
rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection());
rc.setPublishedAddressPolicy(getPublishedAddressPolicy());
rc.setWarnOnRemoteClose(isWarnOnRemoteClose());
return rc;
}
@Override
public BrokerInfo getBrokerInfo() {
return brokerInfo;
}
public void setBrokerInfo(BrokerInfo brokerInfo) {
this.brokerInfo = brokerInfo;
}
public TransportServer getServer() throws IOException, URISyntaxException {
if (server == null) {
setServer(createTransportServer());
}
return server;
}
public void setServer(TransportServer server) {
this.server = server;
}
public URI getUri() {
if (uri == null) {
try {
uri = getConnectUri();
} catch (Throwable e) {
}
}
return uri;
}
/**
* Sets the server transport URI to use if there is not a
* {@link TransportServer} configured via the
* {@link #setServer(TransportServer)} method. This value is used to lazy
* create a {@link TransportServer} instance
*
* @param uri
*/
public void setUri(URI uri) {
this.uri = uri;
}
public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;
}
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
}
/**
* @return the statistics for this connector
*/
@Override
public ConnectorStatistics getStatistics() {
return statistics;
}
public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
return messageAuthorizationPolicy;
}
/**
* Sets the policy used to decide if the current connection is authorized to
* consume a given message
*/
public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
this.messageAuthorizationPolicy = messageAuthorizationPolicy;
}
@Override
public void start() throws Exception {
broker = brokerService.getBroker();
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
getServer().setAcceptListener(new TransportAcceptListener() {
@Override
public void onAccept(final Transport transport) {
final String remoteHost = transport.getRemoteAddress();
try {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
if (!brokerService.isStopping()) {
Connection connection = createConnection(transport);
connection.start();
} else {
throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
}
} catch (Exception e) {
ServiceSupport.dispose(transport);
onAcceptError(e, remoteHost);
}
}
});
} catch (Exception e) {
ServiceSupport.dispose(transport);
onAcceptError(e, remoteHost);
}
}
@Override
public void onAcceptError(Exception error) {
onAcceptError(error, null);
}
private void onAcceptError(Exception error, String remoteHost) {
if (brokerService != null && brokerService.isStopping()) {
LOG.info("Could not accept connection during shutdown {} : {} ({})", (remoteHost == null ? "" : "from " + remoteHost), error.getLocalizedMessage(), getRootCause(error).getMessage());
} else {
LOG.warn("Could not accept connection from {}: {} ({})", (remoteHost == null ? "" : "from " + remoteHost), error.getMessage(), getRootCause(error).getMessage());
LOG.debug("Reason: " + error.getMessage(), error);
}
}
});
getServer().setBrokerInfo(brokerInfo);
getServer().start();
DiscoveryAgent da = getDiscoveryAgent();
if (da != null) {
da.registerService(getPublishableConnectString());
da.start();
}
if (enableStatusMonitor) {
this.statusDector = new TransportStatusDetector(this);
this.statusDector.start();
}
LOG.info("Connector {} started", getName());
}
static Throwable getRootCause(final Throwable throwable) {
final List<Throwable> list = getThrowableList(throwable);
return list.isEmpty() ? null : list.get(list.size() - 1);
}
static List<Throwable> getThrowableList(Throwable throwable) {
final List<Throwable> list = new ArrayList<>();
while (throwable != null && !list.contains(throwable)) {
list.add(throwable);
throwable = throwable.getCause();
}
return list;
}
public String getPublishableConnectString() throws Exception {
String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this);
LOG.debug("Publishing: {} for broker transport URI: {}", publishableConnectString, getConnectUri());
return publishableConnectString;
}
public URI getPublishableConnectURI() throws Exception {
return publishedAddressPolicy.getPublishableConnectURI(this);
}
@Override
public void stop() throws Exception {
ServiceStopper ss = new ServiceStopper();
if (discoveryAgent != null) {
ss.stop(discoveryAgent);
}
if (server != null) {
ss.stop(server);
}
if (this.statusDector != null) {
this.statusDector.stop();
}
for (TransportConnection connection : connections) {
ss.stop(connection);
}
server = null;
ss.throwFirstException();
LOG.info("Connector {} stopped", getName());
}
// Implementation methods
// -------------------------------------------------------------------------
protected Connection createConnection(Transport transport) throws IOException {
// prefer to use task runner from broker service as stop task runner, as we can then
// tie it to the lifecycle of the broker service
TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null
: taskRunnerFactory, brokerService.getTaskRunnerFactory());
boolean statEnabled = this.getStatistics().isEnabled();
answer.getStatistics().setEnabled(statEnabled);
answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy);
return answer;
}
protected TransportServer createTransportServer() throws IOException, URISyntaxException {
if (uri == null) {
throw new IllegalArgumentException("You must specify either a server or uri property");
}
if (brokerService == null) {
throw new IllegalArgumentException(
"You must specify the brokerService property. Maybe this connector should be added to a broker?");
}
return TransportFactorySupport.bind(brokerService, uri);
}
public DiscoveryAgent getDiscoveryAgent() throws IOException {
if (discoveryAgent == null) {
discoveryAgent = createDiscoveryAgent();
}
return discoveryAgent;
}
protected DiscoveryAgent createDiscoveryAgent() throws IOException {
if (discoveryUri != null) {
DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
if (agent != null && agent instanceof BrokerServiceAware) {
((BrokerServiceAware) agent).setBrokerService(brokerService);
}
return agent;
}
return null;
}
public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
this.discoveryAgent = discoveryAgent;
}
public URI getDiscoveryUri() {
return discoveryUri;
}
public void setDiscoveryUri(URI discoveryUri) {
this.discoveryUri = discoveryUri;
}
public URI getConnectUri() throws IOException, URISyntaxException {
if (server != null) {
return server.getConnectURI();
} else {
return uri;
}
}
public void onStarted(TransportConnection connection) {
connections.add(connection);
}
public void onStopped(TransportConnection connection) {
connections.remove(connection);
}
public String getName() {
if (name == null) {
uri = getUri();
if (uri != null) {
name = uri.toString();
}
}
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
String rc = getName();
if (rc == null) {
rc = super.toString();
}
return rc;
}
protected ConnectionControl getConnectionControl() {
boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = "";
String separator = "";
if (isUpdateClusterClients()) {
synchronized (peerBrokers) {
for (String uri : getPeerBrokers()) {
connectedBrokers += separator + uri;
separator = ",";
}
if (rebalance) {
String shuffle = peerBrokers.removeFirst();
peerBrokers.addLast(shuffle);
}
}
}
ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers);
control.setRebalanceConnection(rebalance);
return control;
}
public void addPeerBroker(BrokerInfo info) {
if (isMatchesClusterFilter(info.getBrokerName())) {
synchronized (peerBrokers) {
getPeerBrokers().addLast(info.getBrokerURL());
}
}
}
public void removePeerBroker(BrokerInfo info) {
synchronized (peerBrokers) {
getPeerBrokers().remove(info.getBrokerURL());
}
}
public LinkedList<String> getPeerBrokers() {
synchronized (peerBrokers) {
if (peerBrokers.isEmpty()) {
peerBrokers.add(brokerService.getDefaultSocketURIString());
}
return peerBrokers;
}
}
@Override
public void updateClientClusterInfo() {
if (isRebalanceClusterClients() || isUpdateClusterClients()) {
ConnectionControl control = getConnectionControl();
for (Connection c : this.connections) {
c.updateClient(control);
if (isRebalanceClusterClients()) {
control = getConnectionControl();
}
}
}
}
private boolean isMatchesClusterFilter(String brokerName) {
boolean result = true;
String filter = getUpdateClusterFilter();
if (filter != null) {
filter = filter.trim();
if (filter.length() > 0) {
result = false;
StringTokenizer tokenizer = new StringTokenizer(filter, ",");
while (!result && tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
result = isMatchesClusterFilter(brokerName, token);
}
}
}
return result;
}
private boolean isMatchesClusterFilter(String brokerName, String match) {
boolean result = false;
if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) {
result = Pattern.matches(match, brokerName);
}
return result;
}
public boolean isDisableAsyncDispatch() {
return disableAsyncDispatch;
}
public void setDisableAsyncDispatch(boolean disableAsyncDispatch) {
this.disableAsyncDispatch = disableAsyncDispatch;
}
/**
* @return the enableStatusMonitor
*/
public boolean isEnableStatusMonitor() {
return enableStatusMonitor;
}
/**
* @param enableStatusMonitor
* the enableStatusMonitor to set
*/
public void setEnableStatusMonitor(boolean enableStatusMonitor) {
this.enableStatusMonitor = enableStatusMonitor;
}
/**
* This is called by the BrokerService right before it starts the transport.
*/
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
public Broker getBroker() {
return broker;
}
public BrokerService getBrokerService() {
return brokerService;
}
/**
* @return the updateClusterClients
*/
@Override
public boolean isUpdateClusterClients() {
return this.updateClusterClients;
}
/**
* @param updateClusterClients
* the updateClusterClients to set
*/
public void setUpdateClusterClients(boolean updateClusterClients) {
this.updateClusterClients = updateClusterClients;
}
/**
* @return the rebalanceClusterClients
*/
@Override
public boolean isRebalanceClusterClients() {
return this.rebalanceClusterClients;
}
/**
* @param rebalanceClusterClients
* the rebalanceClusterClients to set
*/
public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
this.rebalanceClusterClients = rebalanceClusterClients;
}
/**
* @return the updateClusterClientsOnRemove
*/
@Override
public boolean isUpdateClusterClientsOnRemove() {
return this.updateClusterClientsOnRemove;
}
/**
* @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set
*/
public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) {
this.updateClusterClientsOnRemove = updateClusterClientsOnRemove;
}
/**
* @return the updateClusterFilter
*/
@Override
public String getUpdateClusterFilter() {
return this.updateClusterFilter;
}
/**
* @param updateClusterFilter
* the updateClusterFilter to set
*/
public void setUpdateClusterFilter(String updateClusterFilter) {
this.updateClusterFilter = updateClusterFilter;
}
@Override
public int connectionCount() {
return connections.size();
}
@Override
public boolean isAllowLinkStealing() {
return server.isAllowLinkStealing();
}
public boolean isAuditNetworkProducers() {
return auditNetworkProducers;
}
/**
* Enable a producer audit on network connections, Traps the case of a missing send reply and resend.
* Note: does not work with conduit=false, networked composite destinations or networked virtual topics
* @param auditNetworkProducers
*/
public void setAuditNetworkProducers(boolean auditNetworkProducers) {
this.auditNetworkProducers = auditNetworkProducers;
}
public int getMaximumProducersAllowedPerConnection() {
return maximumProducersAllowedPerConnection;
}
public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) {
this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection;
}
public int getMaximumConsumersAllowedPerConnection() {
return maximumConsumersAllowedPerConnection;
}
public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) {
this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection;
}
/**
* Gets the currently configured policy for creating the published connection address of this
* TransportConnector.
*
* @return the publishedAddressPolicy
*/
public PublishedAddressPolicy getPublishedAddressPolicy() {
return publishedAddressPolicy;
}
/**
* Sets the configured policy for creating the published connection address of this
* TransportConnector.
*
* @return the publishedAddressPolicy
*/
public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) {
this.publishedAddressPolicy = publishedAddressPolicy;
}
public boolean isWarnOnRemoteClose() {
return warnOnRemoteClose;
}
public void setWarnOnRemoteClose(boolean warnOnRemoteClose) {
this.warnOnRemoteClose = warnOnRemoteClose;
}
}