| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.partition; |
| |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketAddress; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import org.apache.activemq.broker.Broker; |
| import org.apache.activemq.broker.BrokerFilter; |
| import org.apache.activemq.broker.ConnectionContext; |
| import org.apache.activemq.broker.ProducerBrokerExchange; |
| import org.apache.activemq.broker.TransportConnection; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ConnectionControl; |
| import org.apache.activemq.command.ConnectionId; |
| import org.apache.activemq.command.ConnectionInfo; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.partition.dto.Partitioning; |
| import org.apache.activemq.partition.dto.Target; |
| import org.apache.activemq.state.ConsumerState; |
| import org.apache.activemq.state.SessionState; |
| import org.apache.activemq.transport.Transport; |
| import org.apache.activemq.util.LRUCache; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A BrokerFilter which partitions client connections over a cluster of brokers. |
| * |
| * It can use a client identifier like client id, authenticated user name, source ip |
| * address or even destination being used by the connection to figure out which |
| * is the best broker in the cluster that the connection should be using and then |
| * redirects failover clients to that broker. |
| */ |
| public class PartitionBroker extends BrokerFilter { |
| |
| protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class); |
| protected final PartitionBrokerPlugin plugin; |
| protected boolean reloadConfigOnPoll = true; |
| |
| public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) { |
| super(broker); |
| this.plugin = plugin; |
| } |
| |
| @Override |
| public void start() throws Exception { |
| super.start(); |
| getExecutor().execute(new Runnable() { |
| @Override |
| public void run() { |
| Thread.currentThread().setName("Partition Monitor"); |
| onMonitorStart(); |
| try { |
| runPartitionMonitor(); |
| } catch (Exception e) { |
| onMonitorStop(); |
| } |
| } |
| }); |
| } |
| |
| protected void onMonitorStart() { |
| } |
| protected void onMonitorStop() { |
| } |
| |
| protected void runPartitionMonitor() { |
| while( !isStopped() ) { |
| try { |
| monitorWait(); |
| } catch (InterruptedException e) { |
| break; |
| } |
| |
| if(reloadConfigOnPoll) { |
| try { |
| reloadConfiguration(); |
| } catch (Exception e) { |
| continue; |
| } |
| } |
| |
| for( ConnectionMonitor monitor: monitors.values()) { |
| checkTarget(monitor); |
| } |
| } |
| } |
| |
| protected void monitorWait() throws InterruptedException { |
| synchronized (this) { |
| this.wait(1000); |
| } |
| } |
| |
| protected void monitorWakeup() { |
| synchronized (this) { |
| this.notifyAll(); |
| } |
| } |
| |
| protected void reloadConfiguration() throws Exception { |
| } |
| |
| protected void checkTarget(ConnectionMonitor monitor) { |
| |
| // can we find a preferred target for the connection? |
| Target targetDTO = pickBestBroker(monitor); |
| if( targetDTO == null || targetDTO.ids==null) { |
| LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId()); |
| return; |
| } |
| |
| // Are we one the the targets? |
| if( targetDTO.ids.contains(getBrokerName()) ) { |
| LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId()); |
| return; |
| } |
| |
| // Then we need to move the connection over. |
| String connectionString = getConnectionString(targetDTO.ids); |
| if( connectionString==null ) { |
| LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids); |
| return; |
| } |
| |
| LOG.info("Redirecting connection to: " + connectionString); |
| TransportConnection connection = (TransportConnection)monitor.context.getConnection(); |
| ConnectionControl cc = new ConnectionControl(); |
| cc.setConnectedBrokers(connectionString); |
| cc.setRebalanceConnection(true); |
| connection.dispatchAsync(cc); |
| } |
| |
| protected String getConnectionString(HashSet<String> ids) { |
| StringBuilder rc = new StringBuilder(); |
| for (String id : ids) { |
| String url = plugin.getBrokerURL(this, id); |
| if( url!=null ) { |
| if( rc.length()!=0 ) { |
| rc.append(','); |
| } |
| rc.append(url); |
| } |
| } |
| if( rc.length()==0 ) |
| return null; |
| return rc.toString(); |
| } |
| |
| static private class Score { |
| int value; |
| } |
| |
| protected Target pickBestBroker(ConnectionMonitor monitor) { |
| |
| if( getConfig() ==null ) |
| return null; |
| |
| if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) { |
| TransportConnection connection = (TransportConnection)monitor.context.getConnection(); |
| Transport transport = connection.getTransport(); |
| Socket socket = transport.narrow(Socket.class); |
| if( socket !=null ) { |
| SocketAddress address = socket.getRemoteSocketAddress(); |
| if( address instanceof InetSocketAddress) { |
| String ip = ((InetSocketAddress) address).getAddress().getHostAddress(); |
| Target targetDTO = getConfig().bySourceIp.get(ip); |
| if( targetDTO!=null ) { |
| return targetDTO; |
| } |
| } |
| } |
| } |
| |
| if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) { |
| String userName = monitor.context.getUserName(); |
| if( userName !=null ) { |
| Target targetDTO = getConfig().byUserName.get(userName); |
| if( targetDTO!=null ) { |
| return targetDTO; |
| } |
| } |
| } |
| |
| if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) { |
| String clientId = monitor.context.getClientId(); |
| if( clientId!=null ) { |
| Target targetDTO = getConfig().byClientId.get(clientId); |
| if( targetDTO!=null ) { |
| return targetDTO; |
| } |
| } |
| } |
| |
| if( |
| (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty()) |
| || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty()) |
| ) { |
| |
| // Collect the destinations the connection is consuming from... |
| HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>(); |
| for (SessionState session : monitor.context.getConnectionState().getSessionStates()) { |
| for (ConsumerState consumer : session.getConsumerStates()) { |
| ActiveMQDestination destination = consumer.getInfo().getDestination(); |
| if( destination.isComposite() ) { |
| dests.addAll(Arrays.asList(destination.getCompositeDestinations())); |
| } else { |
| dests.addAll(Collections.singletonList(destination)); |
| } |
| } |
| } |
| |
| // Group them by the partitioning target for the destinations and score them.. |
| HashMap<Target, Score> targetScores = new HashMap<Target, Score>(); |
| for (ActiveMQDestination dest : dests) { |
| Target target = getTarget(dest); |
| if( target!=null ) { |
| Score score = targetScores.get(target); |
| if( score == null ) { |
| score = new Score(); |
| targetScores.put(target, score); |
| } |
| score.value++; |
| } |
| } |
| |
| // The target with largest score wins.. |
| if (!targetScores.isEmpty()) { |
| Target bestTarget = null; |
| int bestScore = 0; |
| for (Map.Entry<Target, Score> entry : targetScores.entrySet()) { |
| if (entry.getValue().value > bestScore) { |
| bestTarget = entry.getKey(); |
| bestScore = entry.getValue().value; |
| } |
| } |
| return bestTarget; |
| } |
| |
| // If we get here is because there were no consumers, or the destinations for those |
| // consumers did not have an assigned destination.. So partition based on producer |
| // usage. |
| Target best = monitor.findBestProducerTarget(this); |
| if( best!=null ) { |
| return best; |
| } |
| } |
| return null; |
| } |
| |
| protected Target getTarget(ActiveMQDestination dest) { |
| Partitioning config = getConfig(); |
| if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) { |
| return config.byQueue.get(dest.getPhysicalName()); |
| } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) { |
| return config.byTopic.get(dest.getPhysicalName()); |
| } |
| return null; |
| } |
| |
| protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>(); |
| |
| @Override |
| public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { |
| if( info.isFaultTolerant() ) { |
| ConnectionMonitor monitor = new ConnectionMonitor(context); |
| monitors.put(info.getConnectionId(), monitor); |
| super.addConnection(context, info); |
| checkTarget(monitor); |
| } else { |
| super.addConnection(context, info); |
| } |
| } |
| |
| @Override |
| public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { |
| super.removeConnection(context, info, error); |
| if( info.isFaultTolerant() ) { |
| monitors.remove(info.getConnectionId()); |
| } |
| } |
| |
| @Override |
| public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { |
| ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId()); |
| if( monitor!=null ) { |
| monitor.onSend(producerExchange, messageSend); |
| } |
| } |
| |
| protected Partitioning getConfig() { |
| return plugin.getConfig(); |
| } |
| |
| |
| static class Traffic { |
| long messages; |
| long bytes; |
| } |
| |
| static class ConnectionMonitor { |
| |
| final ConnectionContext context; |
| LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>(); |
| |
| public ConnectionMonitor(ConnectionContext context) { |
| this.context = context; |
| } |
| |
| synchronized public Target findBestProducerTarget(PartitionBroker broker) { |
| Target best = null; |
| long bestSize = 0 ; |
| for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) { |
| Traffic t = entry.getValue(); |
| // Once we get enough messages... |
| if( t.messages < broker.plugin.getMinTransferCount()) { |
| continue; |
| } |
| if( t.bytes > bestSize) { |
| bestSize = t.bytes; |
| Target target = broker.getTarget(entry.getKey()); |
| if( target!=null ) { |
| best = target; |
| } |
| } |
| } |
| return best; |
| } |
| |
| synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) { |
| ActiveMQDestination dest = message.getDestination(); |
| Traffic traffic = trafficPerDestination.get(dest); |
| if( traffic == null ) { |
| traffic = new Traffic(); |
| trafficPerDestination.put(dest, traffic); |
| } |
| traffic.messages += 1; |
| traffic.bytes += message.getSize(); |
| } |
| |
| |
| } |
| |
| } |