[AMQ-9259] Remove activemq-partition and zookeeper test dependency
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 03d29cd..7b99b34 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -68,10 +68,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-http</artifactId>
</dependency>
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>activemq-partition</artifactId>
- </dependency>
<!-- Additional Dependencies. -->
<dependency>
@@ -187,7 +183,6 @@
org.codehaus.jettison*;resolution:=optional,
org.jasypt*;resolution:=optional,
org.eclipse.jetty*;resolution:=optional;version="[9.0,10)",
- org.apache.zookeeper*;resolution:=optional,
org.fusesource.hawtjni*;resolution:=optional,
org.springframework.jms*;version="[4,6)";resolution:=optional,
org.springframework.transaction*;version="[4,6)";resolution:=optional,
diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml
deleted file mode 100644
index 901c869..0000000
--- a/activemq-partition/pom.xml
+++ /dev/null
@@ -1,149 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-parent</artifactId>
- <version>5.19.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>activemq-partition</artifactId>
- <packaging>jar</packaging>
-
- <name>ActiveMQ :: Partition Management</name>
- <description>Used to partition clients over a cluster of brokers</description>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-broker</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.zookeeper-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.util-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </dependency>
-
- <!-- For Optional Snappy Compression -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <!-- Testing Dependencies -->
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j2-impl</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-broker</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- </plugins>
- </build>
- <profiles>
- <profile>
- <id>activemq.tests-sanity</id>
- <activation>
- <property>
- <name>activemq.tests</name>
- <value>smoke</value>
- </property>
- </activation>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <includes>
- <include>**/PartitionBrokerTest.*</include>
- </includes>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>activemq.tests-autoTransport</id>
- <activation>
- <property>
- <name>activemq.tests</name>
- <value>autoTransport</value>
- </property>
- </activation>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>**</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- </profiles>
-</project>
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
deleted file mode 100644
index 9362e64..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerFilter;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.partition.dto.Partitioning;
-import org.apache.activemq.partition.dto.Target;
-import org.apache.activemq.state.ConsumerState;
-import org.apache.activemq.state.SessionState;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.LRUCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A BrokerFilter which partitions client connections over a cluster of brokers.
- *
- * It can use a client identifier like client id, authenticated user name, source ip
- * address or even destination being used by the connection to figure out which
- * is the best broker in the cluster that the connection should be using and then
- * redirects failover clients to that broker.
- */
-public class PartitionBroker extends BrokerFilter {
-
- protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
- protected final PartitionBrokerPlugin plugin;
- protected boolean reloadConfigOnPoll = true;
-
- public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
- super(broker);
- this.plugin = plugin;
- }
-
- @Override
- public void start() throws Exception {
- super.start();
- getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- Thread.currentThread().setName("Partition Monitor");
- onMonitorStart();
- try {
- runPartitionMonitor();
- } catch (Exception e) {
- onMonitorStop();
- }
- }
- });
- }
-
- protected void onMonitorStart() {
- }
- protected void onMonitorStop() {
- }
-
- protected void runPartitionMonitor() {
- while( !isStopped() ) {
- try {
- monitorWait();
- } catch (InterruptedException e) {
- break;
- }
-
- if(reloadConfigOnPoll) {
- try {
- reloadConfiguration();
- } catch (Exception e) {
- continue;
- }
- }
-
- for( ConnectionMonitor monitor: monitors.values()) {
- checkTarget(monitor);
- }
- }
- }
-
- protected void monitorWait() throws InterruptedException {
- synchronized (this) {
- this.wait(1000);
- }
- }
-
- protected void monitorWakeup() {
- synchronized (this) {
- this.notifyAll();
- }
- }
-
- protected void reloadConfiguration() throws Exception {
- }
-
- protected void checkTarget(ConnectionMonitor monitor) {
-
- // can we find a preferred target for the connection?
- Target targetDTO = pickBestBroker(monitor);
- if( targetDTO == null || targetDTO.ids==null) {
- LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
- return;
- }
-
- // Are we one the the targets?
- if( targetDTO.ids.contains(getBrokerName()) ) {
- LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
- return;
- }
-
- // Then we need to move the connection over.
- String connectionString = getConnectionString(targetDTO.ids);
- if( connectionString==null ) {
- LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
- return;
- }
-
- LOG.info("Redirecting connection to: " + connectionString);
- TransportConnection connection = (TransportConnection)monitor.context.getConnection();
- ConnectionControl cc = new ConnectionControl();
- cc.setConnectedBrokers(connectionString);
- cc.setRebalanceConnection(true);
- connection.dispatchAsync(cc);
- }
-
- protected String getConnectionString(HashSet<String> ids) {
- StringBuilder rc = new StringBuilder();
- for (String id : ids) {
- String url = plugin.getBrokerURL(this, id);
- if( url!=null ) {
- if( rc.length()!=0 ) {
- rc.append(',');
- }
- rc.append(url);
- }
- }
- if( rc.length()==0 )
- return null;
- return rc.toString();
- }
-
- static private class Score {
- int value;
- }
-
- protected Target pickBestBroker(ConnectionMonitor monitor) {
-
- if( getConfig() ==null )
- return null;
-
- if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
- TransportConnection connection = (TransportConnection)monitor.context.getConnection();
- Transport transport = connection.getTransport();
- Socket socket = transport.narrow(Socket.class);
- if( socket !=null ) {
- SocketAddress address = socket.getRemoteSocketAddress();
- if( address instanceof InetSocketAddress) {
- String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
- Target targetDTO = getConfig().bySourceIp.get(ip);
- if( targetDTO!=null ) {
- return targetDTO;
- }
- }
- }
- }
-
- if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
- String userName = monitor.context.getUserName();
- if( userName !=null ) {
- Target targetDTO = getConfig().byUserName.get(userName);
- if( targetDTO!=null ) {
- return targetDTO;
- }
- }
- }
-
- if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
- String clientId = monitor.context.getClientId();
- if( clientId!=null ) {
- Target targetDTO = getConfig().byClientId.get(clientId);
- if( targetDTO!=null ) {
- return targetDTO;
- }
- }
- }
-
- if(
- (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
- || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
- ) {
-
- // Collect the destinations the connection is consuming from...
- HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
- for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
- for (ConsumerState consumer : session.getConsumerStates()) {
- ActiveMQDestination destination = consumer.getInfo().getDestination();
- if( destination.isComposite() ) {
- dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
- } else {
- dests.addAll(Collections.singletonList(destination));
- }
- }
- }
-
- // Group them by the partitioning target for the destinations and score them..
- HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
- for (ActiveMQDestination dest : dests) {
- Target target = getTarget(dest);
- if( target!=null ) {
- Score score = targetScores.get(target);
- if( score == null ) {
- score = new Score();
- targetScores.put(target, score);
- }
- score.value++;
- }
- }
-
- // The target with largest score wins..
- if (!targetScores.isEmpty()) {
- Target bestTarget = null;
- int bestScore = 0;
- for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
- if (entry.getValue().value > bestScore) {
- bestTarget = entry.getKey();
- bestScore = entry.getValue().value;
- }
- }
- return bestTarget;
- }
-
- // If we get here is because there were no consumers, or the destinations for those
- // consumers did not have an assigned destination.. So partition based on producer
- // usage.
- Target best = monitor.findBestProducerTarget(this);
- if( best!=null ) {
- return best;
- }
- }
- return null;
- }
-
- protected Target getTarget(ActiveMQDestination dest) {
- Partitioning config = getConfig();
- if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
- return config.byQueue.get(dest.getPhysicalName());
- } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
- return config.byTopic.get(dest.getPhysicalName());
- }
- return null;
- }
-
- protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
-
- @Override
- public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
- if( info.isFaultTolerant() ) {
- ConnectionMonitor monitor = new ConnectionMonitor(context);
- monitors.put(info.getConnectionId(), monitor);
- super.addConnection(context, info);
- checkTarget(monitor);
- } else {
- super.addConnection(context, info);
- }
- }
-
- @Override
- public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
- super.removeConnection(context, info, error);
- if( info.isFaultTolerant() ) {
- monitors.remove(info.getConnectionId());
- }
- }
-
- @Override
- public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
- ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
- if( monitor!=null ) {
- monitor.onSend(producerExchange, messageSend);
- }
- }
-
- protected Partitioning getConfig() {
- return plugin.getConfig();
- }
-
-
- static class Traffic {
- long messages;
- long bytes;
- }
-
- static class ConnectionMonitor {
-
- final ConnectionContext context;
- LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>();
-
- public ConnectionMonitor(ConnectionContext context) {
- this.context = context;
- }
-
- synchronized public Target findBestProducerTarget(PartitionBroker broker) {
- Target best = null;
- long bestSize = 0 ;
- for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
- Traffic t = entry.getValue();
- // Once we get enough messages...
- if( t.messages < broker.plugin.getMinTransferCount()) {
- continue;
- }
- if( t.bytes > bestSize) {
- bestSize = t.bytes;
- Target target = broker.getTarget(entry.getKey());
- if( target!=null ) {
- best = target;
- }
- }
- }
- return best;
- }
-
- synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
- ActiveMQDestination dest = message.getDestination();
- Traffic traffic = trafficPerDestination.get(dest);
- if( traffic == null ) {
- traffic = new Traffic();
- trafficPerDestination.put(dest, traffic);
- }
- traffic.messages += 1;
- traffic.bytes += message.getSize();
- }
-
-
- }
-
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
deleted file mode 100644
index 418f564..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.partition.dto.Partitioning;
-
-import java.io.IOException;
-
-/**
- * A BrokerPlugin which partitions client connections over a cluster of brokers.
- *
- * @org.apache.xbean.XBean element="partitionBrokerPlugin"
- */
-public class PartitionBrokerPlugin implements BrokerPlugin {
-
- protected int minTransferCount;
- protected Partitioning config;
-
- @Override
- public Broker installPlugin(Broker broker) throws Exception {
- return new PartitionBroker(broker, this);
- }
-
- public int getMinTransferCount() {
- return minTransferCount;
- }
-
- public void setMinTransferCount(int minTransferCount) {
- this.minTransferCount = minTransferCount;
- }
-
- public Partitioning getConfig() {
- return config;
- }
-
- public void setConfig(Partitioning config) {
- this.config = config;
- }
-
- public void setConfigAsJson(String config) throws IOException {
- this.config = Partitioning.MAPPER.readValue(config, Partitioning.class);
- }
-
- public String getBrokerURL(PartitionBroker partitionBroker, String id) {
- if( config!=null && config.brokers!=null ) {
- return config.brokers.get(id);
- }
- return null;
- }
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java
deleted file mode 100644
index 2baec62..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.zookeeper.*;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.data.Stat;
-import org.linkedin.util.clock.Clock;
-import org.linkedin.util.clock.SystemClock;
-import org.linkedin.util.clock.Timespan;
-import org.linkedin.util.concurrent.ConcurrentUtils;
-import org.linkedin.util.io.PathUtils;
-import org.linkedin.zookeeper.client.*;
-import org.slf4j.Logger;
-
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
-
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class);
-
- private Map<String, String> acls;
- private String password;
-
- public void start() throws Exception {
- // Grab the lock to make sure that the registration of the ManagedService
- // won't be updated immediately but that the initial update will happen first
- synchronized (_lock) {
- _stateChangeDispatcher.setDaemon(true);
- _stateChangeDispatcher.start();
- doStart();
- }
- }
-
- public void setACLs(Map<String, String> acls) {
- this.acls = acls;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- protected void doStart() throws UnsupportedEncodingException {
- connect();
- }
-
- @Override
- public void close() {
- if (_stateChangeDispatcher != null) {
- _stateChangeDispatcher.end();
- try {
- _stateChangeDispatcher.join(1000);
- } catch (Exception e) {
- LOG.debug("ignored exception", e);
- }
- }
- synchronized(_lock) {
- if (_zk != null) {
- try {
- changeState(State.NONE);
- _zk.close();
- Thread th = getSendThread();
- if (th != null) {
- th.join(1000);
- }
- _zk = null;
- } catch (Exception e) {
- LOG.debug("ignored exception", e);
- }
- }
- }
- }
-
- protected Thread getSendThread() {
- try {
- return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
- } catch (Throwable e) {
- return null;
- }
- }
-
- protected Object getField(Object obj, String... names) throws Exception {
- for (String name : names) {
- obj = getField(obj, name);
- }
- return obj;
- }
-
- protected Object getField(Object obj, String name) throws Exception {
- Class clazz = obj.getClass();
- while (clazz != null) {
- for (Field f : clazz.getDeclaredFields()) {
- if (f.getName().equals(name)) {
- f.setAccessible(true);
- return f.get(obj);
- }
- }
- }
- throw new NoSuchFieldError(name);
- }
-
- protected void changeState(State newState) {
- synchronized (_lock) {
- State oldState = _state;
- if (oldState != newState) {
- _stateChangeDispatcher.addEvent(oldState, newState);
- _state = newState;
- _lock.notifyAll();
- }
- }
- }
-
- public void testGenerateConnectionLoss() throws Exception {
- waitForConnected();
- Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
- callMethod(clientCnxnSocket, "testableCloseSocket");
- }
-
- protected Object callMethod(Object obj, String name, Object... args) throws Exception {
- Class clazz = obj.getClass();
- while (clazz != null) {
- for (Method m : clazz.getDeclaredMethods()) {
- if (m.getName().equals(name)) {
- m.setAccessible(true);
- return m.invoke(obj, args);
- }
- }
- }
- throw new NoSuchMethodError(name);
- }
-
- protected void tryConnect() {
- synchronized (_lock) {
- try {
- connect();
- } catch (Throwable e) {
- LOG.warn("Error while restarting:", e);
- if (_expiredSessionRecovery == null) {
- _expiredSessionRecovery = new ExpiredSessionRecovery();
- _expiredSessionRecovery.setDaemon(true);
- _expiredSessionRecovery.start();
- }
- }
- }
- }
-
- public void connect() throws UnsupportedEncodingException {
- synchronized (_lock) {
- changeState(State.CONNECTING);
- _zk = _factory.createZooKeeper(this);
- if (password != null) {
- _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
- }
- }
- }
-
- public void process(WatchedEvent event) {
- if (event.getState() != null) {
- LOG.debug("event: {}", event.getState());
- synchronized (_lock) {
- switch(event.getState()) {
- case SyncConnected:
- changeState(State.CONNECTED);
- break;
- case Disconnected:
- if (_state != State.NONE) {
- changeState(State.RECONNECTING);
- }
- break;
- case Expired:
- // when expired, the zookeeper object is invalid and we need to recreate a new one
- _zk = null;
- LOG.warn("Expiration detected: trying to restart...");
- tryConnect();
- break;
- default:
- LOG.warn("Unsupported event state: {}", event.getState());
- }
- }
- }
- }
-
- @Override
- protected IZooKeeper getZk() {
- State state = _state;
- if (state == State.NONE) {
- throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
- } else if (state != State.CONNECTING) {
- try {
- waitForConnected();
- } catch (Exception e) {
- throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
- }
- }
- IZooKeeper zk = _zk;
- if (zk == null) {
- throw new IllegalStateException("No ZooKeeper connection available");
- }
- return zk;
- }
-
- public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
- waitForState(State.CONNECTED, timeout);
- }
-
- public void waitForConnected() throws InterruptedException, TimeoutException {
- waitForConnected(null);
- }
-
- public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
- long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
- if (_state != state) {
- synchronized (_lock) {
- while (_state != state) {
- ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
- }
- }
- }
- }
-
- @Override
- public void registerListener(LifecycleListener listener) {
- if (listener == null) {
- throw new IllegalStateException("listener is null");
- }
- if (!_listeners.contains(listener)) {
- _listeners.add(listener);
- }
- if (_state == State.CONNECTED) {
- listener.onConnected();
- //_stateChangeDispatcher.addEvent(null, State.CONNECTED);
- }
- }
-
- @Override
- public void removeListener(LifecycleListener listener) {
- if (listener == null) {
- throw new IllegalStateException("listener is null");
- }
- _listeners.remove(listener);
- }
-
- @Override
- public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
- return new ChrootedZKClient(this, adjustPath(path));
- }
-
- @Override
- public boolean isConnected() {
- return _state == State.CONNECTED;
- }
-
- public boolean isConfigured() {
- return _state != State.NONE;
- }
-
- @Override
- public String getConnectString() {
- return _factory.getConnectString();
- }
-
- public static enum State {
- NONE,
- CONNECTING,
- CONNECTED,
- RECONNECTING
- }
-
- private final static String CHARSET = "UTF-8";
-
- private final Clock _clock = SystemClock.instance();
- private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<>();
-
- protected final Object _lock = new Object();
- protected volatile State _state = State.NONE;
-
- private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
-
- protected IZooKeeperFactory _factory;
- protected IZooKeeper _zk;
- protected Timespan _reconnectTimeout = Timespan.parse("20s");
- protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
-
- private ExpiredSessionRecovery _expiredSessionRecovery = null;
-
- private class StateChangeDispatcher extends Thread {
- private final AtomicBoolean _running = new AtomicBoolean(true);
- private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<>();
-
- private StateChangeDispatcher() {
- super("ZooKeeper state change dispatcher thread");
- }
-
- @Override
- public void run() {
- Map<Object, Boolean> history = new IdentityHashMap<>();
- LOG.info("Starting StateChangeDispatcher");
- while (_running.get()) {
- Boolean isConnectedEvent;
- try {
- isConnectedEvent = _events.take();
- } catch (InterruptedException e) {
- continue;
- }
- if (!_running.get() || isConnectedEvent == null) {
- continue;
- }
- Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
- // we save which event each listener has seen last
- // we don't update the map in place because we need to get rid of unregistered listeners
- history = newHistory;
- }
- LOG.info("StateChangeDispatcher terminated.");
- }
-
- public void end() {
- _running.set(false);
- _events.add(false);
- }
-
- public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
- LOG.debug("addEvent: {} => {}", oldState, newState);
- if (newState == ZKClient.State.CONNECTED) {
- _events.add(true);
- } else if (oldState == ZKClient.State.CONNECTED) {
- _events.add(false);
- }
- }
- }
-
- protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
- Map<Object, Boolean> newHistory = new IdentityHashMap<>();
- for (LifecycleListener listener : _listeners) {
- Boolean previousEvent = history.get(listener);
- // we propagate the event only if it was not already sent
- if (previousEvent == null || previousEvent != connectedEvent) {
- try {
- if (connectedEvent) {
- listener.onConnected();
- } else {
- listener.onDisconnected();
- }
- } catch (Throwable e) {
- LOG.warn("Exception while executing listener (ignored)", e);
- }
- }
- newHistory.put(listener, connectedEvent);
- }
- return newHistory;
- }
-
- private class ExpiredSessionRecovery extends Thread {
-
- private ExpiredSessionRecovery() {
- super("ZooKeeper expired session recovery thread");
- }
-
- @Override
- public void run() {
- LOG.info("Entering recovery mode");
- synchronized (_lock) {
- try {
- int count = 0;
- while (_state == ZKClient.State.NONE) {
- try {
- count++;
- LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count);
- ZKClient.this.connect();
- } catch (Throwable e) {
- LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e);
- try {
- _lock.wait(_reconnectTimeout.getDurationInMilliseconds());
- } catch (InterruptedException e1) {
- throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
- }
- }
- }
- } finally {
- _expiredSessionRecovery = null;
- LOG.info("Exiting recovery mode.");
- }
- }
- }
-
- }
-
- public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) {
- this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
- }
-
- public ZKClient(IZooKeeperFactory factory) {
- this(factory, null);
- }
-
- public ZKClient(IZooKeeperFactory factory, String chroot) {
- super(chroot);
- _factory = factory;
- Map<String, String> acls = new HashMap<>();
- acls.put("/", "world:anyone:acdrw");
- setACLs(acls);
- }
-
- static private int getPermFromString(String permString) {
- int perm = 0;
- for (int i = 0; i < permString.length(); i++) {
- switch (permString.charAt(i)) {
- case 'r':
- perm |= ZooDefs.Perms.READ;
- break;
- case 'w':
- perm |= ZooDefs.Perms.WRITE;
- break;
- case 'c':
- perm |= ZooDefs.Perms.CREATE;
- break;
- case 'd':
- perm |= ZooDefs.Perms.DELETE;
- break;
- case 'a':
- perm |= ZooDefs.Perms.ADMIN;
- break;
- default:
- System.err.println("Unknown perm type:" + permString.charAt(i));
- }
- }
- return perm;
- }
-
- private static List<ACL> parseACLs(String aclString) {
- List<ACL> acl;
- String acls[] = aclString.split(",");
- acl = new ArrayList<>();
- for (String a : acls) {
- int firstColon = a.indexOf(':');
- int lastColon = a.lastIndexOf(':');
- if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
- System.err.println(a + " does not have the form scheme:id:perm");
- continue;
- }
- ACL newAcl = new ACL();
- newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon)));
- newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
- acl.add(newAcl);
- }
- return acl;
- }
-
- public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
- if (exists(path) != null) {
- return setByteData(path, data);
- }
- try {
- createBytesNodeWithParents(path, data, acl, createMode);
- return null;
- } catch (KeeperException.NodeExistsException e) {
- // this should not happen very often (race condition)
- return setByteData(path, data);
- }
- }
-
- public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
- return create(path, (byte[]) null, createMode);
- }
-
- public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
- return create(path, toByteData(data), createMode);
- }
-
- public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
- return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
- }
-
- public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
- return createWithParents(path, (byte[]) null, createMode);
- }
-
- public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
- return createWithParents(path, toByteData(data), createMode);
- }
-
- public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
- createParents(path);
- return create(path, data, createMode);
- }
-
- public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
- return createOrSetWithParents(path, toByteData(data), createMode);
- }
-
- public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
- if (exists(path) != null) {
- return setByteData(path, data);
- }
- try {
- createWithParents(path, data, createMode);
- return null;
- } catch (KeeperException.NodeExistsException e) {
- // this should not happen very often (race condition)
- return setByteData(path, data);
- }
- }
-
- public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
- if (exists(path) != null) {
- doFixACLs(path, recursive);
- }
- }
-
- private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
- setACL(path, getNodeACLs(path), -1);
- if (recursive) {
- for (String child : getChildren(path)) {
- doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
- }
- }
- }
-
- private List<ACL> getNodeACLs(String path) {
- String acl = doGetNodeACLs(adjustPath(path));
- if (acl == null) {
- throw new IllegalStateException("Could not find matching ACLs for " + path);
- }
- return parseACLs(acl);
- }
-
- protected String doGetNodeACLs(String path) {
- String longestPath = "";
- for (String acl : acls.keySet()) {
- if (acl.length() > longestPath.length() && path.startsWith(acl)) {
- longestPath = acl;
- }
- }
- return acls.get(longestPath);
- }
-
- private void createParents(String path) throws InterruptedException, KeeperException {
- path = PathUtils.getParentPath(adjustPath(path));
- path = PathUtils.removeTrailingSlash(path);
- List<String> paths = new ArrayList<>();
- while (!path.equals("") && getZk().exists(path, false) == null) {
- paths.add(path);
- path = PathUtils.getParentPath(path);
- path = PathUtils.removeTrailingSlash(path);
- }
- Collections.reverse(paths);
- for (String p : paths) {
- try {
- getZk().create(p,
- null,
- getNodeACLs(p),
- CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException e) {
- // ok we continue...
- if (LOG.isDebugEnabled()) {
- LOG.debug("parent already exists " + p);
- }
- }
- }
- }
-
- private byte[] toByteData(String data) {
- if (data == null) {
- return null;
- } else {
- try {
- return data.getBytes(CHARSET);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
deleted file mode 100644
index 6d2474b..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.partition.dto.Partitioning;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.linkedin.util.clock.Timespan;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- */
-public class ZooKeeperPartitionBroker extends PartitionBroker {
-
- protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class);
-
- protected volatile ZKClient zk_client = null;
- protected volatile Partitioning config;
- protected final CountDownLatch configAcquired = new CountDownLatch(1);
-
- public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
- super(broker, plugin);
- }
-
- @Override
- public void start() throws Exception {
- super.start();
- // Lets block a bit until we get our config.. Otherwise just keep
- // on going.. not a big deal if we get our config later. Perhaps
- // ZK service is not having a good day.
- configAcquired.await(5, TimeUnit.SECONDS);
- }
-
- @Override
- protected void onMonitorStop() {
- zkDisconnect();
- }
-
- @Override
- protected Partitioning getConfig() {
- return config;
- }
-
- protected ZooKeeperPartitionBrokerPlugin plugin() {
- return (ZooKeeperPartitionBrokerPlugin)plugin;
- }
-
- protected void zkConnect() throws Exception {
- zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null);
- if( plugin().getZkPassword()!=null ) {
- zk_client.setPassword(plugin().getZkPassword());
- }
- zk_client.start();
- zk_client.waitForConnected(Timespan.parse("30s"));
- }
-
- protected void zkDisconnect() {
- if( zk_client!=null ) {
- zk_client.close();
- zk_client = null;
- }
- }
-
- protected void reloadConfiguration() throws Exception {
- if( zk_client==null ) {
- LOG.debug("Connecting to ZooKeeper");
- try {
- zkConnect();
- LOG.debug("Connected to ZooKeeper");
- } catch (Exception e) {
- LOG.debug("Connection to ZooKeeper failed: "+e);
- zkDisconnect();
- throw e;
- }
- }
-
- byte[] data = null;
- try {
- Stat stat = new Stat();
- data = zk_client.getData(plugin().getZkPath(), new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
- try {
- reloadConfiguration();
- } catch (Exception e) {
- }
- monitorWakeup();
- }
- }, stat);
- configAcquired.countDown();
- reloadConfigOnPoll = false;
- } catch (Exception e) {
- LOG.warn("Could load partitioning configuration: " + e, e);
- reloadConfigOnPoll = true;
- }
-
- try {
- config = Partitioning.MAPPER.readValue(data, Partitioning.class);
- } catch (Exception e) {
- LOG.warn("Invalid partitioning configuration: " + e, e);
- }
- }
-
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java
deleted file mode 100644
index 34fa0fc..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerPlugin;
-
-/**
- * A PartitionBrokerPlugin which gets it's configuration from ZooKeeper.
- */
-public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin {
-
- String zkAddress = "127.0.0.1:2181";
- String zkPassword;
- String zkPath = "/broker-assignments";
- String zkSessionTmeout = "10s";
-
- @Override
- public Broker installPlugin(Broker broker) throws Exception {
- return new ZooKeeperPartitionBroker(broker, this);
- }
-
- public String getZkAddress() {
- return zkAddress;
- }
-
- public void setZkAddress(String zkAddress) {
- this.zkAddress = zkAddress;
- }
-
- public String getZkPassword() {
- return zkPassword;
- }
-
- public void setZkPassword(String zkPassword) {
- this.zkPassword = zkPassword;
- }
-
- public String getZkPath() {
- return zkPath;
- }
-
- public void setZkPath(String zkPath) {
- this.zkPath = zkPath;
- }
-
- public String getZkSessionTmeout() {
- return zkSessionTmeout;
- }
-
- public void setZkSessionTmeout(String zkSessionTmeout) {
- this.zkSessionTmeout = zkSessionTmeout;
- }
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java
deleted file mode 100644
index 43f7924..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition.dto;
-
-
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.DeserializationConfig;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-/**
- * The main Configuration class for the PartitionBroker plugin
- */
-public class Partitioning {
-
- static final public ObjectMapper MAPPER = new ObjectMapper();
- static {
- MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
- }
-
- static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper();
- static {
- TO_STRING_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
- TO_STRING_MAPPER.enable(SerializationFeature.INDENT_OUTPUT);
- }
-
- /**
- * If a client connects with a clientId which is listed in the
- * map, then he will be immediately reconnected
- * to the partition target immediately.
- */
- @JsonProperty("by_client_id")
- @JsonDeserialize(contentAs = Target.class)
- public HashMap<String, Target> byClientId;
-
- /**
- * If a client connects with a user priciple which is listed in the
- * map, then he will be immediately reconnected
- * to the partition target immediately.
- */
- @JsonProperty("by_user_name")
- @JsonDeserialize(contentAs = Target.class)
- public HashMap<String, Target> byUserName;
-
- /**
- * If a client connects with source ip which is listed in the
- * map, then he will be immediately reconnected
- * to the partition target immediately.
- */
- @JsonProperty("by_source_ip")
- @JsonDeserialize(contentAs = Target.class)
- public HashMap<String, Target> bySourceIp;
-
- /**
- * Used to map the preferred partitioning of queues across
- * a set of brokers. Once a it is deemed that a connection mostly
- * works with a set of targets configured in this map, the client
- * will be reconnected to the appropriate target.
- */
- @JsonProperty("by_queue")
- @JsonDeserialize(contentAs = Target.class)
- public HashMap<String, Target> byQueue;
-
- /**
- * Used to map the preferred partitioning of topics across
- * a set of brokers. Once a it is deemed that a connection mostly
- * works with a set of targets configured in this map, the client
- * will be reconnected to the appropriate target.
- */
- @JsonProperty("by_topic")
- @JsonDeserialize(contentAs = Target.class)
- public HashMap<String, Target> byTopic;
-
- /**
- * Maps broker names to broker URLs.
- */
- @JsonProperty("brokers")
- @JsonDeserialize(contentAs = String.class)
- public HashMap<String, String> brokers;
-
-
- @Override
- public String toString() {
- try {
- return TO_STRING_MAPPER.writeValueAsString(this);
- } catch (IOException e) {
- return super.toString();
- }
- }
-
- public HashMap<String, String> getBrokers() {
- return brokers;
- }
-
- public void setBrokers(HashMap<String, String> brokers) {
- this.brokers = brokers;
- }
-
- public HashMap<String, Target> getByClientId() {
- return byClientId;
- }
-
- public void setByClientId(HashMap<String, Target> byClientId) {
- this.byClientId = byClientId;
- }
-
- public HashMap<String, Target> getByQueue() {
- return byQueue;
- }
-
- public void setByQueue(HashMap<String, Target> byQueue) {
- this.byQueue = byQueue;
- }
-
- public HashMap<String, Target> getBySourceIp() {
- return bySourceIp;
- }
-
- public void setBySourceIp(HashMap<String, Target> bySourceIp) {
- this.bySourceIp = bySourceIp;
- }
-
- public HashMap<String, Target> getByTopic() {
- return byTopic;
- }
-
- public void setByTopic(HashMap<String, Target> byTopic) {
- this.byTopic = byTopic;
- }
-
- public HashMap<String, Target> getByUserName() {
- return byUserName;
- }
-
- public void setByUserName(HashMap<String, Target> byUserName) {
- this.byUserName = byUserName;
- }
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java
deleted file mode 100644
index 79b53ef..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition.dto;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
-/**
- * Represents a partition target. This identifies the brokers that
- * a partition lives on.
- */
-public class Target {
-
- @JsonProperty("ids")
- public HashSet<String> ids = new HashSet<String>();
-
- public Target() {
- ids = new HashSet<String>();
- }
-
- public Target(String ...ids) {
- this.ids.addAll(java.util.Arrays.asList(ids));
- }
-
- @Override
- public String toString() {
- try {
- return Partitioning.TO_STRING_MAPPER.writeValueAsString(this);
- } catch (IOException e) {
- return super.toString();
- }
- }
-
- public HashSet<String> getIds() {
- return ids;
- }
-
- public void setIds(Collection<String> ids) {
- this.ids = new HashSet<String>(ids);
- }
-
-}
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
deleted file mode 100644
index 1b49f0b..0000000
--- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.partition.dto.Partitioning;
-import org.apache.activemq.partition.dto.Target;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import javax.jms.*;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * Unit tests for the PartitionBroker plugin.
- */
-public class PartitionBrokerTest {
-
- protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
- protected ArrayList<Connection> connections = new ArrayList<Connection>();
- Partitioning partitioning;
-
- @Before
- public void setUp() throws Exception {
- partitioning = new Partitioning();
- partitioning.brokers = new HashMap<String, String>();
- }
-
- /**
- * Partitioning can only re-direct failover clients since those
- * can re-connect and re-establish their state with another broker.
- */
- @Test(timeout = 1000*60*60)
- public void testNonFailoverClientHasNoPartitionEffect() throws Exception {
-
- partitioning.byClientId = new HashMap<String, Target>();
- partitioning.byClientId.put("client1", new Target("broker1"));
- createBrokerCluster(2);
-
- Connection connection = createConnectionToUrl(getConnectURL("broker2"));
- within(5, TimeUnit.SECONDS, new Task() {
- public void run() throws Exception {
- assertEquals(0, getTransportConnector("broker1").getConnections().size());
- assertEquals(1, getTransportConnector("broker2").getConnections().size());
- }
- });
-
- connection.setClientID("client1");
- connection.start();
-
- Thread.sleep(1000);
- assertEquals(0, getTransportConnector("broker1").getConnections().size());
- assertEquals(1, getTransportConnector("broker2").getConnections().size());
- }
-
- @Test(timeout = 1000*60*60)
- public void testPartitionByClientId() throws Exception {
- partitioning.byClientId = new HashMap<String, Target>();
- partitioning.byClientId.put("client1", new Target("broker1"));
- partitioning.byClientId.put("client2", new Target("broker2"));
- createBrokerCluster(2);
-
- Connection connection = createConnectionTo("broker2");
-
- within(5, TimeUnit.SECONDS, new Task() {
- public void run() throws Exception {
- assertEquals(0, getTransportConnector("broker1").getConnections().size());
- assertEquals(1, getTransportConnector("broker2").getConnections().size());
- }
- });
-
- connection.setClientID("client1");
- connection.start();
- within(5, TimeUnit.SECONDS, new Task() {
- public void run() throws Exception {
- assertEquals(1, getTransportConnector("broker1").getConnections().size());
- assertEquals(0, getTransportConnector("broker2").getConnections().size());
- }
- });
- }
-
- @Test(timeout = 1000*60*60)
- public void testPartitionByQueue() throws Exception {
- partitioning.byQueue = new HashMap<String, Target>();
- partitioning.byQueue.put("foo", new Target("broker1"));
- createBrokerCluster(2);
-
- Connection connection2 = createConnectionTo("broker2");
-
- within(5, TimeUnit.SECONDS, new Task() {
- public void run() throws Exception {
- assertEquals(0, getTransportConnector("broker1").getConnections().size());
- assertEquals(1, getTransportConnector("broker2").getConnections().size());
- }
- });
-
- Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo"));
-
- within(5, TimeUnit.SECONDS, new Task() {
- public void run() throws Exception {
- assertEquals(1, getTransportConnector("broker1").getConnections().size());
- assertEquals(0, getTransportConnector("broker2").getConnections().size());
- }
- });
-
- Connection connection1 = createConnectionTo("broker2");
- Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session1.createProducer(session1.createQueue("foo"));
-
- within(5, TimeUnit.SECONDS, new Task() {
- public void run() throws Exception {
- assertEquals(1, getTransportConnector("broker1").getConnections().size());
- assertEquals(1, getTransportConnector("broker2").getConnections().size());
- }
- });
-
- for (int i = 0; i < 100; i++) {
- producer.send(session1.createTextMessage("#" + i));
- }
-
- within(5, TimeUnit.SECONDS, new Task() {
- public void run() throws Exception {
- assertEquals(2, getTransportConnector("broker1").getConnections().size());
- assertEquals(0, getTransportConnector("broker2").getConnections().size());
- }
- });
- }
-
-
- static interface Task {
- public void run() throws Exception;
- }
-
- private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
- long timeMS = unit.toMillis(time);
- long deadline = System.currentTimeMillis() + timeMS;
- while (true) {
- try {
- task.run();
- return;
- } catch (Throwable e) {
- long remaining = deadline - System.currentTimeMillis();
- if( remaining <=0 ) {
- if( e instanceof RuntimeException ) {
- throw (RuntimeException)e;
- }
- if( e instanceof Error ) {
- throw (Error)e;
- }
- throw new RuntimeException(e);
- }
- Thread.sleep(Math.min(timeMS/10, remaining));
- }
- }
- }
-
- protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException {
- return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")?randomize=false");
- }
-
- private Connection createConnectionToUrl(String url) throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
- Connection connection = factory.createConnection();
- connections.add(connection);
- return connection;
- }
-
- protected String getConnectURL(String broker) throws IOException, URISyntaxException {
- TransportConnector tcp = getTransportConnector(broker);
- return tcp.getConnectUri().toString();
- }
-
- private TransportConnector getTransportConnector(String broker) {
- BrokerService brokerService = brokers.get(broker);
- if( brokerService==null ) {
- throw new IllegalArgumentException("Invalid broker id");
- }
- return brokerService.getTransportConnectorByName("tcp");
- }
-
- protected void createBrokerCluster(int brokerCount) throws Exception {
- for (int i = 1; i <= brokerCount; i++) {
- String brokerId = "broker" + i;
- BrokerService broker = createBroker(brokerId);
- broker.setPersistent(false);
- broker.addConnector("tcp://localhost:0").setName("tcp");
- addPartitionBrokerPlugin(broker);
- broker.start();
- broker.waitUntilStarted();
- partitioning.brokers.put(brokerId, getConnectURL(brokerId));
- }
- }
-
- protected void addPartitionBrokerPlugin(BrokerService broker) {
- PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
- plugin.setConfig(partitioning);
- broker.setPlugins(new BrokerPlugin[]{plugin});
- }
-
- protected BrokerService createBroker(String name) {
- BrokerService broker = new BrokerService();
- broker.setBrokerName(name);
- brokers.put(name, broker);
- return broker;
- }
-
- @After
- public void tearDown() throws Exception {
- for (Connection connection : connections) {
- try {
- connection.close();
- } catch (Throwable e) {
- }
- }
- connections.clear();
- for (BrokerService broker : brokers.values()) {
- try {
- broker.stop();
- broker.waitUntilStopped();
- } catch (Throwable e) {
- }
- }
- brokers.clear();
- }
-
-}
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
deleted file mode 100644
index 0a6416b..0000000
--- a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.junit.After;
-import org.junit.Before;
-import org.linkedin.util.clock.Timespan;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-
-/**
- */
-public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest {
-
- NIOServerCnxnFactory connector;
-
- @Before
- public void setUp() throws Exception {
- System.out.println("Starting ZooKeeper");
- ZooKeeperServer zk_server = new ZooKeeperServer();
- zk_server.setTickTime(500);
- zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"), new File("target/test-data/zk-data")));
- connector = new NIOServerCnxnFactory();
- connector.configure(new InetSocketAddress(0), 100);
- connector.startup(zk_server);
- System.out.println("ZooKeeper started");
- super.setUp();
- }
-
- @After
- public void tearDown() throws Exception {
- super.tearDown();
- if( connector!=null ) {
- connector.shutdown();
- connector = null;
- }
- }
-
- String zkPath = "/partition-config";
-
- @Override
- protected void createBrokerCluster(int brokerCount) throws Exception {
- // Store the partitioning in ZK.
- ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"), null);
- try {
- zk_client.start();
- zk_client.waitForConnected(Timespan.parse("30s"));
- try {
- zk_client.delete(zkPath);
- } catch (Throwable e) {
- }
- zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT);
- } finally {
- zk_client.close();
- }
- super.createBrokerCluster(brokerCount);
- }
-
- @Override
- protected void addPartitionBrokerPlugin(BrokerService broker) {
- // Have the borker plugin get the partition config via ZK.
- ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){
- @Override
- public String getBrokerURL(PartitionBroker partitionBroker, String id) {
- try {
- return getConnectURL(id);
- } catch (Exception e) {
- return null;
- }
- }
- };
- plugin.setZkAddress("localhost:" + connector.getLocalPort());
- plugin.setZkPath(zkPath);
- broker.setPlugins(new BrokerPlugin[]{plugin});
- }
-}
diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml
index 005f46a..9618371 100644
--- a/activemq-spring/pom.xml
+++ b/activemq-spring/pom.xml
@@ -78,21 +78,6 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.zookeeper-impl</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.util-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.core</artifactId>
<scope>provided</scope>
@@ -202,7 +187,6 @@
<include>${basedir}/../activemq-kahadb-store/src/main/java</include>
<include>${basedir}/../activemq-mqtt/src/main/java</include>
<include>${basedir}/../activemq-stomp/src/main/java</include>
- <include>${basedir}/../activemq-partition/src/main/java</include>
<include>${basedir}/../activemq-runtime-config/src/main/java</include>
</includes>
<strictXsdOrder>false</strictXsdOrder>
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index bbe633c..6184efb 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -60,10 +60,6 @@
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-partition</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>activemq-runtime-config</artifactId>
</dependency>
<dependency>
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java
deleted file mode 100644
index dcf4e69..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.partition;
-
-import junit.framework.TestCase;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.partition.PartitionBrokerPlugin;
-import org.apache.activemq.partition.dto.Partitioning;
-
-/**
- */
-public class SpringPartitionBrokerTest extends TestCase {
-
- public void testCreatePartitionBroker() throws Exception {
-
- BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml");
- assertEquals(1, broker.getPlugins().length);
- PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0];
- Partitioning config = plugin.getConfig();
- assertEquals(2, config.getBrokers().size());
-
- Object o;
- String json = "{\n" +
- " \"by_client_id\":{\n" +
- " \"client1\":{\"ids\":[\"broker1\"]},\n" +
- " \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" +
- " },\n" +
- " \"brokers\":{\n" +
- " \"broker1\":\"tcp://localhost:61616\",\n" +
- " \"broker2\":\"tcp://localhost:61616\"\n" +
- " }\n" +
- "}";
- Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class);
- assertEquals(expected.toString(), config.toString());
-
- }
-
-}
diff --git a/activemq-unit-tests/src/test/resources/activemq-partition.xml b/activemq-unit-tests/src/test/resources/activemq-partition.xml
deleted file mode 100644
index 4bb96f2..0000000
--- a/activemq-unit-tests/src/test/resources/activemq-partition.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<!-- START SNIPPET: xbean -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
-
- <bean id="config" class="java.lang.String">
- <constructor-arg><value>
- <![CDATA[
- {
- "by_client_id":{
- "client1":{"ids":["broker1"]},
- "client2":{"ids":["broker1","broker2"]}
- },
- "brokers":{
- "broker1":"tcp://localhost:61616",
- "broker2":"tcp://localhost:61616"
- }
- }
- ]]>
- </value></constructor-arg>
- </bean>
-
- <broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="false">
-
- <plugins>
- <partitionBrokerPlugin minTransferCount="5" configAsJson="#config"/>
- </plugins>
-
- <transportConnectors>
- <transportConnector uri="tcp://localhost:61616"/>
- </transportConnectors>
-
- </broker>
-
-</beans>
-<!-- END SNIPPET: xbean -->
diff --git a/assembly/pom.xml b/assembly/pom.xml
index a5d2b0c..d1d30d4 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -55,10 +55,6 @@
<type>test-jar</type>
</dependency>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>activemq-partition</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
</dependency>
@@ -68,19 +64,6 @@
<artifactId>hawtdispatch-transport</artifactId>
<version>${hawtdispatch-version}</version>
</dependency>
- <dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.zookeeper-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.util-core</artifactId>
- <version>${linkedin-zookeeper-version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </dependency>
<dependency>
<groupId>org.osgi</groupId>
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index f77fa1c..ff9c509 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -182,7 +182,6 @@
<include>${pom.groupId}:activemq-log4j-appender</include>
<include>${pom.groupId}:activemq-jms-pool</include>
<include>${pom.groupId}:activemq-pool</include>
- <include>${pom.groupId}:activemq-partition</include>
<include>${pom.groupId}:activemq-shiro</include>
<include>commons-beanutils:commons-beanutils</include>
<include>commons-collections:commons-collections</include>
diff --git a/pom.xml b/pom.xml
index 325985d..4d02cf9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,8 +90,6 @@
<mqtt-client-version>1.16</mqtt-client-version>
<org-apache-derby-version>10.15.2.0</org-apache-derby-version>
<osgi-version>6.0.0</osgi-version>
- <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
- <zookeeper-version>3.4.14</zookeeper-version>
<qpid-proton-version>0.33.10</qpid-proton-version>
<qpid-jms-version>1.6.0</qpid-jms-version>
<netty-version>4.1.75.Final</netty-version>
@@ -229,7 +227,6 @@
<module>activemq-runtime-config</module>
<module>activemq-tooling</module>
<module>activemq-web</module>
- <module>activemq-partition</module>
<module>activemq-osgi</module>
<module>activemq-blueprint</module>
<module>activemq-web-demo</module>
@@ -316,11 +313,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-partition</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<version>${project.version}</version>
@@ -585,65 +577,6 @@
<version>${pax-logging-version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zookeeper-version}</version>
- <exclusions>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zookeeper-version}</version>
- <exclusions>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.zookeeper-impl</artifactId>
- <version>${linkedin-zookeeper-version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.linkedin</groupId>
- <artifactId>org.linkedin.util-core</artifactId>
- <version>${linkedin-zookeeper-version}</version>
- </dependency>
-
<!-- zeroconf transport -->
<dependency>
<groupId>org.jmdns</groupId>