blob: 9362e64b26e05cd1e69284829329dc318b723e05 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A BrokerFilter which partitions client connections over a cluster of brokers.
*
* It can use a client identifier like client id, authenticated user name, source ip
* address or even destination being used by the connection to figure out which
* is the best broker in the cluster that the connection should be using and then
* redirects failover clients to that broker.
*/
public class PartitionBroker extends BrokerFilter {
protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
protected final PartitionBrokerPlugin plugin;
protected boolean reloadConfigOnPoll = true;
public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
super(broker);
this.plugin = plugin;
}
@Override
public void start() throws Exception {
super.start();
getExecutor().execute(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("Partition Monitor");
onMonitorStart();
try {
runPartitionMonitor();
} catch (Exception e) {
onMonitorStop();
}
}
});
}
protected void onMonitorStart() {
}
protected void onMonitorStop() {
}
protected void runPartitionMonitor() {
while( !isStopped() ) {
try {
monitorWait();
} catch (InterruptedException e) {
break;
}
if(reloadConfigOnPoll) {
try {
reloadConfiguration();
} catch (Exception e) {
continue;
}
}
for( ConnectionMonitor monitor: monitors.values()) {
checkTarget(monitor);
}
}
}
protected void monitorWait() throws InterruptedException {
synchronized (this) {
this.wait(1000);
}
}
protected void monitorWakeup() {
synchronized (this) {
this.notifyAll();
}
}
protected void reloadConfiguration() throws Exception {
}
protected void checkTarget(ConnectionMonitor monitor) {
// can we find a preferred target for the connection?
Target targetDTO = pickBestBroker(monitor);
if( targetDTO == null || targetDTO.ids==null) {
LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
return;
}
// Are we one the the targets?
if( targetDTO.ids.contains(getBrokerName()) ) {
LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
return;
}
// Then we need to move the connection over.
String connectionString = getConnectionString(targetDTO.ids);
if( connectionString==null ) {
LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
return;
}
LOG.info("Redirecting connection to: " + connectionString);
TransportConnection connection = (TransportConnection)monitor.context.getConnection();
ConnectionControl cc = new ConnectionControl();
cc.setConnectedBrokers(connectionString);
cc.setRebalanceConnection(true);
connection.dispatchAsync(cc);
}
protected String getConnectionString(HashSet<String> ids) {
StringBuilder rc = new StringBuilder();
for (String id : ids) {
String url = plugin.getBrokerURL(this, id);
if( url!=null ) {
if( rc.length()!=0 ) {
rc.append(',');
}
rc.append(url);
}
}
if( rc.length()==0 )
return null;
return rc.toString();
}
static private class Score {
int value;
}
protected Target pickBestBroker(ConnectionMonitor monitor) {
if( getConfig() ==null )
return null;
if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
TransportConnection connection = (TransportConnection)monitor.context.getConnection();
Transport transport = connection.getTransport();
Socket socket = transport.narrow(Socket.class);
if( socket !=null ) {
SocketAddress address = socket.getRemoteSocketAddress();
if( address instanceof InetSocketAddress) {
String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
Target targetDTO = getConfig().bySourceIp.get(ip);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
}
if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
String userName = monitor.context.getUserName();
if( userName !=null ) {
Target targetDTO = getConfig().byUserName.get(userName);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
String clientId = monitor.context.getClientId();
if( clientId!=null ) {
Target targetDTO = getConfig().byClientId.get(clientId);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
if(
(getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
|| (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
) {
// Collect the destinations the connection is consuming from...
HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
for (ConsumerState consumer : session.getConsumerStates()) {
ActiveMQDestination destination = consumer.getInfo().getDestination();
if( destination.isComposite() ) {
dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
} else {
dests.addAll(Collections.singletonList(destination));
}
}
}
// Group them by the partitioning target for the destinations and score them..
HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
for (ActiveMQDestination dest : dests) {
Target target = getTarget(dest);
if( target!=null ) {
Score score = targetScores.get(target);
if( score == null ) {
score = new Score();
targetScores.put(target, score);
}
score.value++;
}
}
// The target with largest score wins..
if (!targetScores.isEmpty()) {
Target bestTarget = null;
int bestScore = 0;
for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
if (entry.getValue().value > bestScore) {
bestTarget = entry.getKey();
bestScore = entry.getValue().value;
}
}
return bestTarget;
}
// If we get here is because there were no consumers, or the destinations for those
// consumers did not have an assigned destination.. So partition based on producer
// usage.
Target best = monitor.findBestProducerTarget(this);
if( best!=null ) {
return best;
}
}
return null;
}
protected Target getTarget(ActiveMQDestination dest) {
Partitioning config = getConfig();
if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
return config.byQueue.get(dest.getPhysicalName());
} else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
return config.byTopic.get(dest.getPhysicalName());
}
return null;
}
protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
if( info.isFaultTolerant() ) {
ConnectionMonitor monitor = new ConnectionMonitor(context);
monitors.put(info.getConnectionId(), monitor);
super.addConnection(context, info);
checkTarget(monitor);
} else {
super.addConnection(context, info);
}
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
if( info.isFaultTolerant() ) {
monitors.remove(info.getConnectionId());
}
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
if( monitor!=null ) {
monitor.onSend(producerExchange, messageSend);
}
}
protected Partitioning getConfig() {
return plugin.getConfig();
}
static class Traffic {
long messages;
long bytes;
}
static class ConnectionMonitor {
final ConnectionContext context;
LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>();
public ConnectionMonitor(ConnectionContext context) {
this.context = context;
}
synchronized public Target findBestProducerTarget(PartitionBroker broker) {
Target best = null;
long bestSize = 0 ;
for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
Traffic t = entry.getValue();
// Once we get enough messages...
if( t.messages < broker.plugin.getMinTransferCount()) {
continue;
}
if( t.bytes > bestSize) {
bestSize = t.bytes;
Target target = broker.getTarget(entry.getKey());
if( target!=null ) {
best = target;
}
}
}
return best;
}
synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
ActiveMQDestination dest = message.getDestination();
Traffic traffic = trafficPerDestination.get(dest);
if( traffic == null ) {
traffic = new Traffic();
trafficPerDestination.put(dest, traffic);
}
traffic.messages += 1;
traffic.bytes += message.getSize();
}
}
}