[AMQ-9259] Remove activemq-partition and zookeeper test dependency
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 03d29cd..7b99b34 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -68,10 +68,6 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>activemq-http</artifactId>
     </dependency>
-    <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>activemq-partition</artifactId>
-    </dependency>
 
     <!-- Additional Dependencies. -->
     <dependency>
@@ -187,7 +183,6 @@
               org.codehaus.jettison*;resolution:=optional,
               org.jasypt*;resolution:=optional,
               org.eclipse.jetty*;resolution:=optional;version="[9.0,10)",
-              org.apache.zookeeper*;resolution:=optional,
               org.fusesource.hawtjni*;resolution:=optional,
               org.springframework.jms*;version="[4,6)";resolution:=optional,
               org.springframework.transaction*;version="[4,6)";resolution:=optional,
diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml
deleted file mode 100644
index 901c869..0000000
--- a/activemq-partition/pom.xml
+++ /dev/null
@@ -1,149 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.activemq</groupId>
-    <artifactId>activemq-parent</artifactId>
-    <version>5.19.0-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>activemq-partition</artifactId>
-  <packaging>jar</packaging>
-
-  <name>ActiveMQ :: Partition Management</name>
-  <description>Used to partition clients over a cluster of brokers</description>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-broker</artifactId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>compile</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.linkedin</groupId>
-      <artifactId>org.linkedin.zookeeper-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.linkedin</groupId>
-      <artifactId>org.linkedin.util-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-    </dependency>
-
-    <!-- For Optional Snappy Compression -->
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <!-- Testing Dependencies -->
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j2-impl</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-broker</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>activemq.tests-sanity</id>
-      <activation>
-        <property>
-          <name>activemq.tests</name>
-          <value>smoke</value>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <includes>
-                <include>**/PartitionBrokerTest.*</include>
-              </includes>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-	<profile>
-      <id>activemq.tests-autoTransport</id>
-      <activation>
-        <property>
-          <name>activemq.tests</name>
-          <value>autoTransport</value>
-        </property>
-      </activation>
-      <build>
-        <plugins>
-          <plugin>
-            <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <excludes>
-                <exclude>**</exclude>
-              </excludes>
-            </configuration>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-
-  </profiles>
-</project>
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
deleted file mode 100644
index 9362e64..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerFilter;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.partition.dto.Partitioning;
-import org.apache.activemq.partition.dto.Target;
-import org.apache.activemq.state.ConsumerState;
-import org.apache.activemq.state.SessionState;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.util.LRUCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A BrokerFilter which partitions client connections over a cluster of brokers.
- *
- * It can use a client identifier like client id, authenticated user name, source ip
- * address or even destination being used by the connection to figure out which
- * is the best broker in the cluster that the connection should be using and then
- * redirects failover clients to that broker.
- */
-public class PartitionBroker extends BrokerFilter {
-
-    protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
-    protected final PartitionBrokerPlugin plugin;
-    protected boolean reloadConfigOnPoll = true;
-
-    public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
-        super(broker);
-        this.plugin = plugin;
-    }
-
-    @Override
-    public void start() throws Exception {
-        super.start();
-        getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                Thread.currentThread().setName("Partition Monitor");
-                onMonitorStart();
-                try {
-                    runPartitionMonitor();
-                } catch (Exception e) {
-                    onMonitorStop();
-                }
-            }
-        });
-    }
-
-    protected void onMonitorStart() {
-    }
-    protected void onMonitorStop() {
-    }
-
-    protected void runPartitionMonitor() {
-        while( !isStopped() ) {
-            try {
-                monitorWait();
-            } catch (InterruptedException e) {
-                break;
-            }
-
-            if(reloadConfigOnPoll) {
-                try {
-                    reloadConfiguration();
-                } catch (Exception e) {
-                    continue;
-                }
-            }
-
-            for( ConnectionMonitor monitor: monitors.values()) {
-                checkTarget(monitor);
-            }
-        }
-    }
-
-    protected void monitorWait() throws InterruptedException {
-        synchronized (this) {
-            this.wait(1000);
-        }
-    }
-
-    protected void monitorWakeup()  {
-        synchronized (this) {
-            this.notifyAll();
-        }
-    }
-
-    protected void reloadConfiguration() throws Exception {
-    }
-
-    protected void checkTarget(ConnectionMonitor monitor) {
-
-        // can we find a preferred target for the connection?
-        Target targetDTO = pickBestBroker(monitor);
-        if( targetDTO == null || targetDTO.ids==null) {
-            LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
-            return;
-        }
-
-        // Are we one the the targets?
-        if( targetDTO.ids.contains(getBrokerName()) ) {
-            LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
-            return;
-        }
-
-        // Then we need to move the connection over.
-        String connectionString = getConnectionString(targetDTO.ids);
-        if( connectionString==null ) {
-            LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
-            return;
-        }
-
-        LOG.info("Redirecting connection to: " + connectionString);
-        TransportConnection connection = (TransportConnection)monitor.context.getConnection();
-        ConnectionControl cc = new ConnectionControl();
-        cc.setConnectedBrokers(connectionString);
-        cc.setRebalanceConnection(true);
-        connection.dispatchAsync(cc);
-    }
-
-    protected String getConnectionString(HashSet<String> ids) {
-        StringBuilder rc = new StringBuilder();
-        for (String id : ids) {
-            String url = plugin.getBrokerURL(this, id);
-            if( url!=null ) {
-                if( rc.length()!=0 ) {
-                    rc.append(',');
-                }
-                rc.append(url);
-            }
-        }
-        if( rc.length()==0 )
-            return null;
-        return rc.toString();
-    }
-
-    static private class Score {
-        int value;
-    }
-
-    protected Target pickBestBroker(ConnectionMonitor monitor) {
-
-        if( getConfig() ==null )
-            return null;
-
-        if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
-            TransportConnection connection = (TransportConnection)monitor.context.getConnection();
-            Transport transport = connection.getTransport();
-            Socket socket = transport.narrow(Socket.class);
-            if( socket !=null ) {
-                SocketAddress address = socket.getRemoteSocketAddress();
-                if( address instanceof InetSocketAddress) {
-                    String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
-                    Target targetDTO = getConfig().bySourceIp.get(ip);
-                    if( targetDTO!=null ) {
-                        return targetDTO;
-                    }
-                }
-            }
-        }
-
-        if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
-            String userName = monitor.context.getUserName();
-            if( userName !=null ) {
-                Target targetDTO = getConfig().byUserName.get(userName);
-                if( targetDTO!=null ) {
-                    return targetDTO;
-                }
-            }
-        }
-
-        if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
-            String clientId = monitor.context.getClientId();
-            if( clientId!=null ) {
-                Target targetDTO = getConfig().byClientId.get(clientId);
-                if( targetDTO!=null ) {
-                    return targetDTO;
-                }
-            }
-        }
-
-        if(
-             (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
-          || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
-          ) {
-
-            // Collect the destinations the connection is consuming from...
-            HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
-            for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
-                for (ConsumerState consumer : session.getConsumerStates()) {
-                    ActiveMQDestination destination = consumer.getInfo().getDestination();
-                    if( destination.isComposite() ) {
-                        dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
-                    } else {
-                        dests.addAll(Collections.singletonList(destination));
-                    }
-                }
-            }
-
-            // Group them by the partitioning target for the destinations and score them..
-            HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
-            for (ActiveMQDestination dest : dests) {
-                Target target = getTarget(dest);
-                if( target!=null ) {
-                    Score score = targetScores.get(target);
-                    if( score == null ) {
-                        score = new Score();
-                        targetScores.put(target, score);
-                    }
-                    score.value++;
-                }
-            }
-
-            // The target with largest score wins..
-            if (!targetScores.isEmpty()) {
-                Target bestTarget = null;
-                int bestScore = 0;
-                for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
-                    if (entry.getValue().value > bestScore) {
-                        bestTarget = entry.getKey();
-                        bestScore = entry.getValue().value;
-                    }
-                }
-                return bestTarget;
-            }
-
-            // If we get here is because there were no consumers, or the destinations for those
-            // consumers did not have an assigned destination..  So partition based on producer
-            // usage.
-            Target best = monitor.findBestProducerTarget(this);
-            if( best!=null ) {
-                return best;
-            }
-        }
-        return null;
-    }
-
-    protected Target getTarget(ActiveMQDestination dest) {
-        Partitioning config = getConfig();
-        if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
-            return config.byQueue.get(dest.getPhysicalName());
-        } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
-            return config.byTopic.get(dest.getPhysicalName());
-        }
-        return null;
-    }
-
-    protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
-
-    @Override
-    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
-        if( info.isFaultTolerant() ) {
-            ConnectionMonitor monitor = new ConnectionMonitor(context);
-            monitors.put(info.getConnectionId(), monitor);
-            super.addConnection(context, info);
-            checkTarget(monitor);
-        } else {
-            super.addConnection(context, info);
-        }
-    }
-
-    @Override
-    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
-        super.removeConnection(context, info, error);
-        if( info.isFaultTolerant() ) {
-            monitors.remove(info.getConnectionId());
-        }
-    }
-
-    @Override
-    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
-        ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
-        if( monitor!=null ) {
-            monitor.onSend(producerExchange, messageSend);
-        }
-    }
-
-    protected Partitioning getConfig() {
-        return plugin.getConfig();
-    }
-
-
-    static class Traffic {
-        long messages;
-        long bytes;
-    }
-
-    static class ConnectionMonitor {
-
-        final ConnectionContext context;
-        LRUCache<ActiveMQDestination, Traffic> trafficPerDestination =  new LRUCache<ActiveMQDestination, Traffic>();
-
-        public ConnectionMonitor(ConnectionContext context) {
-            this.context = context;
-        }
-
-        synchronized public Target findBestProducerTarget(PartitionBroker broker) {
-            Target best = null;
-            long bestSize = 0 ;
-            for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
-                Traffic t = entry.getValue();
-                // Once we get enough messages...
-                if( t.messages < broker.plugin.getMinTransferCount()) {
-                    continue;
-                }
-                if( t.bytes > bestSize) {
-                    bestSize = t.bytes;
-                    Target target = broker.getTarget(entry.getKey());
-                    if( target!=null ) {
-                        best = target;
-                    }
-                }
-            }
-            return best;
-        }
-
-        synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
-            ActiveMQDestination dest = message.getDestination();
-            Traffic traffic = trafficPerDestination.get(dest);
-            if( traffic == null ) {
-                traffic = new Traffic();
-                trafficPerDestination.put(dest, traffic);
-            }
-            traffic.messages += 1;
-            traffic.bytes += message.getSize();
-        }
-
-
-    }
-
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
deleted file mode 100644
index 418f564..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.partition.dto.Partitioning;
-
-import java.io.IOException;
-
-/**
- * A BrokerPlugin which partitions client connections over a cluster of brokers.
- *
- * @org.apache.xbean.XBean element="partitionBrokerPlugin"
- */
-public class PartitionBrokerPlugin implements BrokerPlugin {
-
-    protected int minTransferCount;
-    protected Partitioning config;
-
-    @Override
-    public Broker installPlugin(Broker broker) throws Exception {
-        return new PartitionBroker(broker, this);
-    }
-
-    public int getMinTransferCount() {
-        return minTransferCount;
-    }
-
-    public void setMinTransferCount(int minTransferCount) {
-        this.minTransferCount = minTransferCount;
-    }
-
-    public Partitioning getConfig() {
-        return config;
-    }
-
-    public void setConfig(Partitioning config) {
-        this.config = config;
-    }
-
-    public void setConfigAsJson(String config) throws IOException {
-        this.config = Partitioning.MAPPER.readValue(config, Partitioning.class);
-    }
-
-    public String getBrokerURL(PartitionBroker partitionBroker, String id) {
-        if( config!=null && config.brokers!=null ) {
-            return config.brokers.get(id);
-        }
-        return null;
-    }
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java
deleted file mode 100644
index 2baec62..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.zookeeper.*;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.data.Stat;
-import org.linkedin.util.clock.Clock;
-import org.linkedin.util.clock.SystemClock;
-import org.linkedin.util.clock.Timespan;
-import org.linkedin.util.concurrent.ConcurrentUtils;
-import org.linkedin.util.io.PathUtils;
-import org.linkedin.zookeeper.client.*;
-import org.slf4j.Logger;
-
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class ZKClient  extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
-
-    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class);
-
-    private Map<String, String> acls;
-    private String password;
-
-    public void start() throws Exception {
-        // Grab the lock to make sure that the registration of the ManagedService
-        // won't be updated immediately but that the initial update will happen first
-        synchronized (_lock) {
-            _stateChangeDispatcher.setDaemon(true);
-            _stateChangeDispatcher.start();
-            doStart();
-        }
-    }
-
-    public void setACLs(Map<String, String> acls) {
-        this.acls = acls;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    protected void doStart() throws UnsupportedEncodingException {
-        connect();
-    }
-
-    @Override
-    public void close() {
-        if (_stateChangeDispatcher != null) {
-            _stateChangeDispatcher.end();
-            try {
-                _stateChangeDispatcher.join(1000);
-            } catch (Exception e) {
-                LOG.debug("ignored exception", e);
-            }
-        }
-        synchronized(_lock) {
-            if (_zk != null) {
-                try {
-                    changeState(State.NONE);
-                    _zk.close();
-                    Thread th = getSendThread();
-                    if (th != null) {
-                        th.join(1000);
-                    }
-                    _zk = null;
-                } catch (Exception e) {
-                    LOG.debug("ignored exception", e);
-                }
-            }
-        }
-    }
-
-    protected Thread getSendThread() {
-        try {
-            return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
-        } catch (Throwable e) {
-            return null;
-        }
-    }
-
-    protected Object getField(Object obj, String... names) throws Exception {
-        for (String name : names) {
-            obj = getField(obj, name);
-        }
-        return obj;
-    }
-
-    protected Object getField(Object obj, String name) throws Exception {
-        Class clazz = obj.getClass();
-        while (clazz != null) {
-            for (Field f : clazz.getDeclaredFields()) {
-                if (f.getName().equals(name)) {
-                    f.setAccessible(true);
-                    return f.get(obj);
-                }
-            }
-        }
-        throw new NoSuchFieldError(name);
-    }
-
-    protected void changeState(State newState) {
-        synchronized (_lock) {
-            State oldState = _state;
-            if (oldState != newState) {
-                _stateChangeDispatcher.addEvent(oldState, newState);
-                _state = newState;
-                _lock.notifyAll();
-            }
-        }
-    }
-
-    public void testGenerateConnectionLoss() throws Exception {
-        waitForConnected();
-        Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
-        callMethod(clientCnxnSocket, "testableCloseSocket");
-    }
-
-    protected Object callMethod(Object obj, String name, Object... args) throws Exception {
-        Class clazz = obj.getClass();
-        while (clazz != null) {
-            for (Method m : clazz.getDeclaredMethods()) {
-                if (m.getName().equals(name)) {
-                    m.setAccessible(true);
-                    return m.invoke(obj, args);
-                }
-            }
-        }
-        throw new NoSuchMethodError(name);
-    }
-
-    protected void tryConnect() {
-        synchronized (_lock) {
-            try {
-                connect();
-            } catch (Throwable e) {
-                LOG.warn("Error while restarting:", e);
-                if (_expiredSessionRecovery == null) {
-                    _expiredSessionRecovery = new ExpiredSessionRecovery();
-                    _expiredSessionRecovery.setDaemon(true);
-                    _expiredSessionRecovery.start();
-                }
-            }
-        }
-    }
-
-    public void connect() throws UnsupportedEncodingException {
-        synchronized (_lock) {
-            changeState(State.CONNECTING);
-            _zk = _factory.createZooKeeper(this);
-            if (password != null) {
-                _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
-            }
-        }
-    }
-
-    public void process(WatchedEvent event) {
-        if (event.getState() != null) {
-            LOG.debug("event: {}", event.getState());
-            synchronized (_lock) {
-                switch(event.getState()) {
-                    case SyncConnected:
-                        changeState(State.CONNECTED);
-                        break;
-                    case Disconnected:
-                        if (_state != State.NONE) {
-                            changeState(State.RECONNECTING);
-                        }
-                        break;
-                    case Expired:
-                        // when expired, the zookeeper object is invalid and we need to recreate a new one
-                        _zk = null;
-                        LOG.warn("Expiration detected: trying to restart...");
-                        tryConnect();
-                        break;
-                    default:
-                        LOG.warn("Unsupported event state: {}", event.getState());
-                }
-            }
-        }
-    }
-
-    @Override
-    protected IZooKeeper getZk() {
-        State state = _state;
-        if (state == State.NONE) {
-            throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
-        } else if (state != State.CONNECTING) {
-            try {
-                waitForConnected();
-            } catch (Exception e) {
-                throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
-            }
-        }
-        IZooKeeper zk = _zk;
-        if (zk == null) {
-            throw new IllegalStateException("No ZooKeeper connection available");
-        }
-        return zk;
-    }
-
-    public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
-        waitForState(State.CONNECTED, timeout);
-    }
-
-    public void waitForConnected() throws InterruptedException, TimeoutException {
-        waitForConnected(null);
-    }
-
-    public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
-        long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
-        if (_state != state) {
-            synchronized (_lock) {
-                while (_state != state) {
-                    ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
-                }
-            }
-        }
-    }
-
-    @Override
-    public void registerListener(LifecycleListener listener) {
-        if (listener == null) {
-            throw new IllegalStateException("listener is null");
-        }
-        if (!_listeners.contains(listener)) {
-            _listeners.add(listener);
-        }
-        if (_state == State.CONNECTED) {
-            listener.onConnected();
-            //_stateChangeDispatcher.addEvent(null, State.CONNECTED);
-        }
-    }
-
-    @Override
-    public void removeListener(LifecycleListener listener) {
-        if (listener == null) {
-            throw new IllegalStateException("listener is null");
-        }
-        _listeners.remove(listener);
-    }
-
-    @Override
-    public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
-        return new ChrootedZKClient(this, adjustPath(path));
-    }
-
-    @Override
-    public boolean isConnected() {
-        return _state == State.CONNECTED;
-    }
-
-    public boolean isConfigured() {
-        return _state != State.NONE;
-    }
-
-    @Override
-    public String getConnectString() {
-        return _factory.getConnectString();
-    }
-
-    public static enum State {
-        NONE,
-        CONNECTING,
-        CONNECTED,
-        RECONNECTING
-    }
-
-    private final static String CHARSET = "UTF-8";
-
-    private final Clock _clock = SystemClock.instance();
-    private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<>();
-
-    protected final Object _lock = new Object();
-    protected volatile State _state = State.NONE;
-
-    private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
-
-    protected IZooKeeperFactory _factory;
-    protected IZooKeeper _zk;
-    protected Timespan _reconnectTimeout = Timespan.parse("20s");
-    protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
-
-    private ExpiredSessionRecovery _expiredSessionRecovery = null;
-
-    private class StateChangeDispatcher extends Thread {
-        private final AtomicBoolean _running = new AtomicBoolean(true);
-        private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<>();
-
-        private StateChangeDispatcher() {
-            super("ZooKeeper state change dispatcher thread");
-        }
-
-        @Override
-        public void run() {
-            Map<Object, Boolean> history = new IdentityHashMap<>();
-            LOG.info("Starting StateChangeDispatcher");
-            while (_running.get()) {
-                Boolean isConnectedEvent;
-                try {
-                    isConnectedEvent = _events.take();
-                } catch (InterruptedException e) {
-                    continue;
-                }
-                if (!_running.get() || isConnectedEvent == null) {
-                    continue;
-                }
-                Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
-                // we save which event each listener has seen last
-                // we don't update the map in place because we need to get rid of unregistered listeners
-                history = newHistory;
-            }
-            LOG.info("StateChangeDispatcher terminated.");
-        }
-
-        public void end() {
-            _running.set(false);
-            _events.add(false);
-        }
-
-        public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
-            LOG.debug("addEvent: {} => {}", oldState, newState);
-            if (newState == ZKClient.State.CONNECTED) {
-                _events.add(true);
-            } else if (oldState == ZKClient.State.CONNECTED) {
-                _events.add(false);
-            }
-        }
-    }
-
-    protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
-        Map<Object, Boolean> newHistory = new IdentityHashMap<>();
-        for (LifecycleListener listener : _listeners) {
-            Boolean previousEvent = history.get(listener);
-            // we propagate the event only if it was not already sent
-            if (previousEvent == null || previousEvent != connectedEvent) {
-                try {
-                    if (connectedEvent) {
-                        listener.onConnected();
-                    } else {
-                        listener.onDisconnected();
-                    }
-                } catch (Throwable e) {
-                    LOG.warn("Exception while executing listener (ignored)", e);
-                }
-            }
-            newHistory.put(listener, connectedEvent);
-        }
-        return newHistory;
-    }
-
-    private class ExpiredSessionRecovery extends Thread {
-
-        private ExpiredSessionRecovery() {
-            super("ZooKeeper expired session recovery thread");
-        }
-
-        @Override
-        public void run() {
-            LOG.info("Entering recovery mode");
-            synchronized (_lock) {
-                try {
-                    int count = 0;
-                    while (_state == ZKClient.State.NONE) {
-                        try {
-                            count++;
-                            LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count);
-                            ZKClient.this.connect();
-                        } catch (Throwable e) {
-                            LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e);
-                            try {
-                                _lock.wait(_reconnectTimeout.getDurationInMilliseconds());
-                            } catch (InterruptedException e1) {
-                                throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
-                            }
-                        }
-                    }
-                } finally {
-                    _expiredSessionRecovery = null;
-                    LOG.info("Exiting recovery mode.");
-                }
-            }
-        }
-
-    }
-
-    public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) {
-        this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
-    }
-
-    public ZKClient(IZooKeeperFactory factory) {
-        this(factory, null);
-    }
-
-    public ZKClient(IZooKeeperFactory factory, String chroot) {
-        super(chroot);
-        _factory = factory;
-        Map<String, String> acls = new HashMap<>();
-        acls.put("/", "world:anyone:acdrw");
-        setACLs(acls);
-    }
-
-    static private int getPermFromString(String permString) {
-        int perm = 0;
-        for (int i = 0; i < permString.length(); i++) {
-            switch (permString.charAt(i)) {
-                case 'r':
-                    perm |= ZooDefs.Perms.READ;
-                    break;
-                case 'w':
-                    perm |= ZooDefs.Perms.WRITE;
-                    break;
-                case 'c':
-                    perm |= ZooDefs.Perms.CREATE;
-                    break;
-                case 'd':
-                    perm |= ZooDefs.Perms.DELETE;
-                    break;
-                case 'a':
-                    perm |= ZooDefs.Perms.ADMIN;
-                    break;
-                default:
-                    System.err.println("Unknown perm type:" + permString.charAt(i));
-            }
-        }
-        return perm;
-    }
-
-    private static List<ACL> parseACLs(String aclString) {
-        List<ACL>  acl;
-        String acls[] = aclString.split(",");
-        acl = new ArrayList<>();
-        for (String a : acls) {
-            int firstColon = a.indexOf(':');
-            int lastColon = a.lastIndexOf(':');
-            if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
-                System.err.println(a + " does not have the form scheme:id:perm");
-                continue;
-            }
-            ACL newAcl = new ACL();
-            newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon)));
-            newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
-            acl.add(newAcl);
-        }
-        return acl;
-    }
-
-    public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
-        if (exists(path) != null) {
-            return setByteData(path, data);
-        }
-        try {
-            createBytesNodeWithParents(path, data, acl, createMode);
-            return null;
-        } catch (KeeperException.NodeExistsException e) {
-            // this should not happen very often (race condition)
-            return setByteData(path, data);
-        }
-    }
-
-    public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
-        return create(path, (byte[]) null, createMode);
-    }
-
-    public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
-        return create(path, toByteData(data), createMode);
-    }
-
-    public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
-        return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
-    }
-
-    public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
-        return createWithParents(path, (byte[]) null, createMode);
-    }
-
-    public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
-        return createWithParents(path, toByteData(data), createMode);
-    }
-
-    public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
-        createParents(path);
-        return create(path, data, createMode);
-    }
-
-    public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
-        return createOrSetWithParents(path, toByteData(data), createMode);
-    }
-
-    public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
-        if (exists(path) != null) {
-            return setByteData(path, data);
-        }
-        try {
-            createWithParents(path, data, createMode);
-            return null;
-        } catch (KeeperException.NodeExistsException e) {
-            // this should not happen very often (race condition)
-            return setByteData(path, data);
-        }
-    }
-
-    public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
-        if (exists(path) != null) {
-            doFixACLs(path, recursive);
-        }
-    }
-
-    private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
-        setACL(path, getNodeACLs(path), -1);
-        if (recursive) {
-            for (String child : getChildren(path)) {
-                doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
-            }
-        }
-    }
-
-    private List<ACL> getNodeACLs(String path) {
-        String acl = doGetNodeACLs(adjustPath(path));
-        if (acl == null) {
-            throw new IllegalStateException("Could not find matching ACLs for " + path);
-        }
-        return parseACLs(acl);
-    }
-
-    protected String doGetNodeACLs(String path) {
-        String longestPath = "";
-        for (String acl : acls.keySet()) {
-            if (acl.length() > longestPath.length() && path.startsWith(acl)) {
-                longestPath = acl;
-            }
-        }
-        return acls.get(longestPath);
-    }
-
-    private void createParents(String path) throws InterruptedException, KeeperException {
-        path = PathUtils.getParentPath(adjustPath(path));
-        path = PathUtils.removeTrailingSlash(path);
-        List<String> paths = new ArrayList<>();
-        while (!path.equals("") && getZk().exists(path, false) == null) {
-            paths.add(path);
-            path = PathUtils.getParentPath(path);
-            path = PathUtils.removeTrailingSlash(path);
-        }
-        Collections.reverse(paths);
-        for (String p : paths) {
-            try {
-                getZk().create(p,
-                        null,
-                        getNodeACLs(p),
-                        CreateMode.PERSISTENT);
-            } catch (KeeperException.NodeExistsException e) {
-                // ok we continue...
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("parent already exists " + p);
-                }
-            }
-        }
-    }
-
-    private byte[] toByteData(String data) {
-        if (data == null) {
-            return null;
-        } else {
-            try {
-                return data.getBytes(CHARSET);
-            } catch (UnsupportedEncodingException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
deleted file mode 100644
index 6d2474b..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.partition.dto.Partitioning;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.data.Stat;
-import org.linkedin.util.clock.Timespan;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- */
-public class ZooKeeperPartitionBroker extends PartitionBroker {
-
-    protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class);
-
-    protected volatile ZKClient zk_client = null;
-    protected volatile Partitioning config;
-    protected final CountDownLatch configAcquired = new CountDownLatch(1);
-
-    public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
-        super(broker, plugin);
-    }
-
-    @Override
-    public void start() throws Exception {
-        super.start();
-        // Lets block a bit until we get our config.. Otherwise just keep
-        // on going.. not a big deal if we get our config later.  Perhaps
-        // ZK service is not having a good day.
-        configAcquired.await(5, TimeUnit.SECONDS);
-    }
-
-    @Override
-    protected void onMonitorStop() {
-        zkDisconnect();
-    }
-
-    @Override
-    protected Partitioning getConfig() {
-        return config;
-    }
-
-    protected ZooKeeperPartitionBrokerPlugin plugin() {
-        return (ZooKeeperPartitionBrokerPlugin)plugin;
-    }
-
-    protected void zkConnect() throws Exception {
-        zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null);
-        if( plugin().getZkPassword()!=null ) {
-            zk_client.setPassword(plugin().getZkPassword());
-        }
-        zk_client.start();
-        zk_client.waitForConnected(Timespan.parse("30s"));
-    }
-
-    protected void zkDisconnect() {
-        if( zk_client!=null ) {
-            zk_client.close();
-            zk_client = null;
-        }
-    }
-
-    protected void reloadConfiguration() throws Exception {
-        if( zk_client==null )  {
-            LOG.debug("Connecting to ZooKeeper");
-            try {
-                zkConnect();
-                LOG.debug("Connected to ZooKeeper");
-            } catch (Exception e) {
-                LOG.debug("Connection to ZooKeeper failed: "+e);
-                zkDisconnect();
-                throw e;
-            }
-        }
-
-        byte[] data = null;
-        try {
-            Stat stat = new Stat();
-            data = zk_client.getData(plugin().getZkPath(), new Watcher() {
-                @Override
-                public void process(WatchedEvent watchedEvent) {
-                    try {
-                        reloadConfiguration();
-                    } catch (Exception e) {
-                    }
-                    monitorWakeup();
-                }
-            }, stat);
-            configAcquired.countDown();
-            reloadConfigOnPoll = false;
-        } catch (Exception e) {
-            LOG.warn("Could load partitioning configuration: " + e, e);
-            reloadConfigOnPoll = true;
-        }
-
-        try {
-            config = Partitioning.MAPPER.readValue(data, Partitioning.class);
-        } catch (Exception e) {
-            LOG.warn("Invalid partitioning configuration: " + e, e);
-        }
-    }
-
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java
deleted file mode 100644
index 34fa0fc..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerPlugin;
-
-/**
- * A PartitionBrokerPlugin which gets it's configuration from ZooKeeper.
- */
-public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin {
-
-    String zkAddress = "127.0.0.1:2181";
-    String zkPassword;
-    String zkPath = "/broker-assignments";
-    String zkSessionTmeout = "10s";
-
-    @Override
-    public Broker installPlugin(Broker broker) throws Exception {
-        return new ZooKeeperPartitionBroker(broker, this);
-    }
-
-    public String getZkAddress() {
-        return zkAddress;
-    }
-
-    public void setZkAddress(String zkAddress) {
-        this.zkAddress = zkAddress;
-    }
-
-    public String getZkPassword() {
-        return zkPassword;
-    }
-
-    public void setZkPassword(String zkPassword) {
-        this.zkPassword = zkPassword;
-    }
-
-    public String getZkPath() {
-        return zkPath;
-    }
-
-    public void setZkPath(String zkPath) {
-        this.zkPath = zkPath;
-    }
-
-    public String getZkSessionTmeout() {
-        return zkSessionTmeout;
-    }
-
-    public void setZkSessionTmeout(String zkSessionTmeout) {
-        this.zkSessionTmeout = zkSessionTmeout;
-    }
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java
deleted file mode 100644
index 43f7924..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition.dto;
-
-
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.DeserializationConfig;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-/**
- * The main Configuration class for the PartitionBroker plugin
- */
-public class Partitioning {
-
-    static final public ObjectMapper MAPPER = new ObjectMapper();
-    static {
-        MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-        MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-    }
-
-    static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper();
-    static {
-        TO_STRING_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-        TO_STRING_MAPPER.enable(SerializationFeature.INDENT_OUTPUT);
-    }
-
-    /**
-     * If a client connects with a clientId which is listed in the
-     * map, then he will be immediately reconnected
-     * to the partition target immediately.
-     */
-    @JsonProperty("by_client_id")
-    @JsonDeserialize(contentAs = Target.class)
-    public HashMap<String, Target> byClientId;
-
-    /**
-     * If a client connects with a user priciple which is listed in the
-     * map, then he will be immediately reconnected
-     * to the partition target immediately.
-     */
-    @JsonProperty("by_user_name")
-    @JsonDeserialize(contentAs = Target.class)
-    public HashMap<String, Target> byUserName;
-
-    /**
-     * If a client connects with source ip which is listed in the
-     * map, then he will be immediately reconnected
-     * to the partition target immediately.
-     */
-    @JsonProperty("by_source_ip")
-    @JsonDeserialize(contentAs = Target.class)
-    public HashMap<String, Target> bySourceIp;
-
-    /**
-     * Used to map the preferred partitioning of queues across
-     * a set of brokers.  Once a it is deemed that a connection mostly
-     * works with a set of targets configured in this map, the client
-     * will be reconnected to the appropriate target.
-     */
-    @JsonProperty("by_queue")
-    @JsonDeserialize(contentAs = Target.class)
-    public HashMap<String, Target> byQueue;
-
-    /**
-     * Used to map the preferred partitioning of topics across
-     * a set of brokers.  Once a it is deemed that a connection mostly
-     * works with a set of targets configured in this map, the client
-     * will be reconnected to the appropriate target.
-     */
-    @JsonProperty("by_topic")
-    @JsonDeserialize(contentAs = Target.class)
-    public HashMap<String, Target> byTopic;
-
-    /**
-     * Maps broker names to broker URLs.
-     */
-    @JsonProperty("brokers")
-    @JsonDeserialize(contentAs = String.class)
-    public HashMap<String, String> brokers;
-
-
-    @Override
-    public String toString() {
-        try {
-            return TO_STRING_MAPPER.writeValueAsString(this);
-        } catch (IOException e) {
-            return super.toString();
-        }
-    }
-
-    public HashMap<String, String> getBrokers() {
-        return brokers;
-    }
-
-    public void setBrokers(HashMap<String, String> brokers) {
-        this.brokers = brokers;
-    }
-
-    public HashMap<String, Target> getByClientId() {
-        return byClientId;
-    }
-
-    public void setByClientId(HashMap<String, Target> byClientId) {
-        this.byClientId = byClientId;
-    }
-
-    public HashMap<String, Target> getByQueue() {
-        return byQueue;
-    }
-
-    public void setByQueue(HashMap<String, Target> byQueue) {
-        this.byQueue = byQueue;
-    }
-
-    public HashMap<String, Target> getBySourceIp() {
-        return bySourceIp;
-    }
-
-    public void setBySourceIp(HashMap<String, Target> bySourceIp) {
-        this.bySourceIp = bySourceIp;
-    }
-
-    public HashMap<String, Target> getByTopic() {
-        return byTopic;
-    }
-
-    public void setByTopic(HashMap<String, Target> byTopic) {
-        this.byTopic = byTopic;
-    }
-
-    public HashMap<String, Target> getByUserName() {
-        return byUserName;
-    }
-
-    public void setByUserName(HashMap<String, Target> byUserName) {
-        this.byUserName = byUserName;
-    }
-}
diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java
deleted file mode 100644
index 79b53ef..0000000
--- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition.dto;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-
-/**
- * Represents a partition target.  This identifies the brokers that
- * a partition lives on.
- */
-public class Target {
-
-    @JsonProperty("ids")
-    public HashSet<String> ids = new HashSet<String>();
-
-    public Target() {
-        ids = new HashSet<String>();
-    }
-
-    public Target(String ...ids) {
-        this.ids.addAll(java.util.Arrays.asList(ids));
-    }
-
-    @Override
-    public String toString() {
-        try {
-            return Partitioning.TO_STRING_MAPPER.writeValueAsString(this);
-        } catch (IOException e) {
-            return super.toString();
-        }
-    }
-
-    public HashSet<String> getIds() {
-        return ids;
-    }
-
-    public void setIds(Collection<String> ids) {
-        this.ids = new HashSet<String>(ids);
-    }
-
-}
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
deleted file mode 100644
index 1b49f0b..0000000
--- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.partition.dto.Partitioning;
-import org.apache.activemq.partition.dto.Target;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import javax.jms.*;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.*;
-
-/**
- * Unit tests for the PartitionBroker plugin.
- */
-public class PartitionBrokerTest {
-
-    protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
-    protected ArrayList<Connection> connections = new ArrayList<Connection>();
-    Partitioning partitioning;
-
-    @Before
-    public void setUp() throws Exception {
-        partitioning = new Partitioning();
-        partitioning.brokers = new HashMap<String, String>();
-    }
-
-    /**
-     * Partitioning can only re-direct failover clients since those
-     * can re-connect and re-establish their state with another broker.
-     */
-    @Test(timeout = 1000*60*60)
-    public void testNonFailoverClientHasNoPartitionEffect() throws Exception {
-
-        partitioning.byClientId = new HashMap<String, Target>();
-        partitioning.byClientId.put("client1", new Target("broker1"));
-        createBrokerCluster(2);
-
-        Connection connection = createConnectionToUrl(getConnectURL("broker2"));
-        within(5, TimeUnit.SECONDS, new Task() {
-            public void run() throws Exception {
-                assertEquals(0, getTransportConnector("broker1").getConnections().size());
-                assertEquals(1, getTransportConnector("broker2").getConnections().size());
-            }
-        });
-
-        connection.setClientID("client1");
-        connection.start();
-
-        Thread.sleep(1000);
-        assertEquals(0, getTransportConnector("broker1").getConnections().size());
-        assertEquals(1, getTransportConnector("broker2").getConnections().size());
-    }
-
-    @Test(timeout = 1000*60*60)
-    public void testPartitionByClientId() throws Exception {
-        partitioning.byClientId = new HashMap<String, Target>();
-        partitioning.byClientId.put("client1", new Target("broker1"));
-        partitioning.byClientId.put("client2", new Target("broker2"));
-        createBrokerCluster(2);
-
-        Connection connection = createConnectionTo("broker2");
-
-        within(5, TimeUnit.SECONDS, new Task() {
-            public void run() throws Exception {
-                assertEquals(0, getTransportConnector("broker1").getConnections().size());
-                assertEquals(1, getTransportConnector("broker2").getConnections().size());
-            }
-        });
-
-        connection.setClientID("client1");
-        connection.start();
-        within(5, TimeUnit.SECONDS, new Task() {
-            public void run() throws Exception {
-                assertEquals(1, getTransportConnector("broker1").getConnections().size());
-                assertEquals(0, getTransportConnector("broker2").getConnections().size());
-            }
-        });
-    }
-
-    @Test(timeout = 1000*60*60)
-    public void testPartitionByQueue() throws Exception {
-        partitioning.byQueue = new HashMap<String, Target>();
-        partitioning.byQueue.put("foo", new Target("broker1"));
-        createBrokerCluster(2);
-
-        Connection connection2 = createConnectionTo("broker2");
-
-        within(5, TimeUnit.SECONDS, new Task() {
-            public void run() throws Exception {
-                assertEquals(0, getTransportConnector("broker1").getConnections().size());
-                assertEquals(1, getTransportConnector("broker2").getConnections().size());
-            }
-        });
-
-        Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo"));
-
-        within(5, TimeUnit.SECONDS, new Task() {
-            public void run() throws Exception {
-                assertEquals(1, getTransportConnector("broker1").getConnections().size());
-                assertEquals(0, getTransportConnector("broker2").getConnections().size());
-            }
-        });
-
-        Connection connection1 = createConnectionTo("broker2");
-        Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer producer = session1.createProducer(session1.createQueue("foo"));
-
-        within(5, TimeUnit.SECONDS, new Task() {
-            public void run() throws Exception {
-                assertEquals(1, getTransportConnector("broker1").getConnections().size());
-                assertEquals(1, getTransportConnector("broker2").getConnections().size());
-            }
-        });
-
-        for (int i = 0; i < 100; i++) {
-            producer.send(session1.createTextMessage("#" + i));
-        }
-
-        within(5, TimeUnit.SECONDS, new Task() {
-            public void run() throws Exception {
-                assertEquals(2, getTransportConnector("broker1").getConnections().size());
-                assertEquals(0, getTransportConnector("broker2").getConnections().size());
-            }
-        });
-    }
-
-
-    static interface Task {
-        public void run() throws Exception;
-    }
-
-    private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
-        long timeMS = unit.toMillis(time);
-        long deadline = System.currentTimeMillis() + timeMS;
-        while (true) {
-            try {
-                task.run();
-                return;
-            } catch (Throwable e) {
-                long remaining = deadline - System.currentTimeMillis();
-                if( remaining <=0 ) {
-                    if( e instanceof RuntimeException ) {
-                        throw (RuntimeException)e;
-                    }
-                    if( e instanceof Error ) {
-                        throw (Error)e;
-                    }
-                    throw new RuntimeException(e);
-                }
-                Thread.sleep(Math.min(timeMS/10, remaining));
-            }
-        }
-    }
-
-    protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException {
-        return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")?randomize=false");
-    }
-
-    private Connection createConnectionToUrl(String url) throws JMSException {
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
-        Connection connection = factory.createConnection();
-        connections.add(connection);
-        return connection;
-    }
-
-    protected String getConnectURL(String broker) throws IOException, URISyntaxException {
-        TransportConnector tcp = getTransportConnector(broker);
-        return tcp.getConnectUri().toString();
-    }
-
-    private TransportConnector getTransportConnector(String broker) {
-        BrokerService brokerService = brokers.get(broker);
-        if( brokerService==null ) {
-            throw new IllegalArgumentException("Invalid broker id");
-        }
-        return brokerService.getTransportConnectorByName("tcp");
-    }
-
-    protected void createBrokerCluster(int brokerCount) throws Exception {
-        for (int i = 1; i <= brokerCount; i++) {
-            String brokerId = "broker" + i;
-            BrokerService broker = createBroker(brokerId);
-            broker.setPersistent(false);
-            broker.addConnector("tcp://localhost:0").setName("tcp");
-            addPartitionBrokerPlugin(broker);
-            broker.start();
-            broker.waitUntilStarted();
-            partitioning.brokers.put(brokerId, getConnectURL(brokerId));
-        }
-    }
-
-    protected void addPartitionBrokerPlugin(BrokerService broker) {
-        PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
-        plugin.setConfig(partitioning);
-        broker.setPlugins(new BrokerPlugin[]{plugin});
-    }
-
-    protected BrokerService createBroker(String name) {
-        BrokerService broker = new BrokerService();
-        broker.setBrokerName(name);
-        brokers.put(name, broker);
-        return broker;
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        for (Connection connection : connections) {
-            try {
-                connection.close();
-            } catch (Throwable e) {
-            }
-        }
-        connections.clear();
-        for (BrokerService broker : brokers.values()) {
-            try {
-                broker.stop();
-                broker.waitUntilStopped();
-            } catch (Throwable e) {
-            }
-        }
-        brokers.clear();
-    }
-
-}
diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
deleted file mode 100644
index 0a6416b..0000000
--- a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.partition;
-
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.junit.After;
-import org.junit.Before;
-import org.linkedin.util.clock.Timespan;
-
-import java.io.File;
-import java.net.InetSocketAddress;
-
-/**
- */
-public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest {
-
-    NIOServerCnxnFactory connector;
-
-    @Before
-    public void setUp() throws Exception {
-        System.out.println("Starting ZooKeeper");
-        ZooKeeperServer zk_server = new ZooKeeperServer();
-        zk_server.setTickTime(500);
-        zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"), new File("target/test-data/zk-data")));
-        connector = new NIOServerCnxnFactory();
-        connector.configure(new InetSocketAddress(0), 100);
-        connector.startup(zk_server);
-        System.out.println("ZooKeeper started");
-        super.setUp();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-        if( connector!=null ) {
-          connector.shutdown();
-          connector = null;
-        }
-    }
-
-    String zkPath = "/partition-config";
-
-    @Override
-    protected void createBrokerCluster(int brokerCount) throws Exception {
-        // Store the partitioning in ZK.
-        ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"), null);
-        try {
-            zk_client.start();
-            zk_client.waitForConnected(Timespan.parse("30s"));
-            try {
-                zk_client.delete(zkPath);
-            } catch (Throwable e) {
-            }
-            zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT);
-        } finally {
-            zk_client.close();
-        }
-        super.createBrokerCluster(brokerCount);
-    }
-
-    @Override
-    protected void addPartitionBrokerPlugin(BrokerService broker) {
-        // Have the borker plugin get the partition config via ZK.
-        ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){
-            @Override
-            public String getBrokerURL(PartitionBroker partitionBroker, String id) {
-                try {
-                    return getConnectURL(id);
-                } catch (Exception e) {
-                    return null;
-                }
-            }
-        };
-        plugin.setZkAddress("localhost:" + connector.getLocalPort());
-        plugin.setZkPath(zkPath);
-        broker.setPlugins(new BrokerPlugin[]{plugin});
-    }
-}
diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml
index 005f46a..9618371 100644
--- a/activemq-spring/pom.xml
+++ b/activemq-spring/pom.xml
@@ -78,21 +78,6 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.linkedin</groupId>
-      <artifactId>org.linkedin.zookeeper-impl</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.linkedin</groupId>
-      <artifactId>org.linkedin.util-core</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
       <groupId>org.osgi</groupId>
       <artifactId>osgi.core</artifactId>
       <scope>provided</scope>
@@ -202,7 +187,6 @@
                 <include>${basedir}/../activemq-kahadb-store/src/main/java</include>
                 <include>${basedir}/../activemq-mqtt/src/main/java</include>
                 <include>${basedir}/../activemq-stomp/src/main/java</include>
-                <include>${basedir}/../activemq-partition/src/main/java</include>
                 <include>${basedir}/../activemq-runtime-config/src/main/java</include>
               </includes>
               <strictXsdOrder>false</strictXsdOrder>
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index bbe633c..6184efb 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -60,10 +60,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-partition</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-runtime-config</artifactId>
     </dependency>
     <dependency>
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java
deleted file mode 100644
index dcf4e69..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker.partition;
-
-import junit.framework.TestCase;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.partition.PartitionBrokerPlugin;
-import org.apache.activemq.partition.dto.Partitioning;
-
-/**
- */
-public class SpringPartitionBrokerTest extends TestCase {
-
-    public void testCreatePartitionBroker() throws Exception {
-
-        BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml");
-        assertEquals(1, broker.getPlugins().length);
-        PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0];
-        Partitioning config = plugin.getConfig();
-        assertEquals(2,  config.getBrokers().size());
-
-        Object o;
-        String json = "{\n" +
-        "  \"by_client_id\":{\n" +
-        "    \"client1\":{\"ids\":[\"broker1\"]},\n" +
-        "    \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" +
-        "  },\n" +
-        "  \"brokers\":{\n" +
-        "    \"broker1\":\"tcp://localhost:61616\",\n" +
-        "    \"broker2\":\"tcp://localhost:61616\"\n" +
-        "  }\n" +
-        "}";
-        Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class);
-        assertEquals(expected.toString(), config.toString());
-
-    }
-
-}
diff --git a/activemq-unit-tests/src/test/resources/activemq-partition.xml b/activemq-unit-tests/src/test/resources/activemq-partition.xml
deleted file mode 100644
index 4bb96f2..0000000
--- a/activemq-unit-tests/src/test/resources/activemq-partition.xml
+++ /dev/null
@@ -1,58 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-   
-    http://www.apache.org/licenses/LICENSE-2.0
-   
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<!-- START SNIPPET: xbean -->
-<beans 
-  xmlns="http://www.springframework.org/schema/beans" 
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
- 
-  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
-
-  <bean id="config" class="java.lang.String">
-    <constructor-arg><value>
-    <![CDATA[
-      {
-        "by_client_id":{
-          "client1":{"ids":["broker1"]},
-          "client2":{"ids":["broker1","broker2"]}
-        },
-        "brokers":{
-          "broker1":"tcp://localhost:61616",
-          "broker2":"tcp://localhost:61616"
-        }
-      }
-    ]]>
-    </value></constructor-arg>
-  </bean>
-
-  <broker useJmx="false"  xmlns="http://activemq.apache.org/schema/core" persistent="false">
-
-    <plugins>
-      <partitionBrokerPlugin minTransferCount="5" configAsJson="#config"/>
-    </plugins>
-
-    <transportConnectors>
-      <transportConnector uri="tcp://localhost:61616"/>
-    </transportConnectors>
-        
-  </broker>
-  
-</beans>
-<!-- END SNIPPET: xbean -->
diff --git a/assembly/pom.xml b/assembly/pom.xml
index a5d2b0c..d1d30d4 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -55,10 +55,6 @@
       <type>test-jar</type>
     </dependency>
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>activemq-partition</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.activemq.tooling</groupId>
       <artifactId>activemq-junit</artifactId>
     </dependency>
@@ -68,19 +64,6 @@
       <artifactId>hawtdispatch-transport</artifactId>
       <version>${hawtdispatch-version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.linkedin</groupId>
-      <artifactId>org.linkedin.zookeeper-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.linkedin</groupId>
-      <artifactId>org.linkedin.util-core</artifactId>
-      <version>${linkedin-zookeeper-version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-    </dependency>
 
     <dependency>
       <groupId>org.osgi</groupId>
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index f77fa1c..ff9c509 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -182,7 +182,6 @@
         <include>${pom.groupId}:activemq-log4j-appender</include>
         <include>${pom.groupId}:activemq-jms-pool</include>
         <include>${pom.groupId}:activemq-pool</include>
-        <include>${pom.groupId}:activemq-partition</include>
         <include>${pom.groupId}:activemq-shiro</include>
         <include>commons-beanutils:commons-beanutils</include>
         <include>commons-collections:commons-collections</include>
diff --git a/pom.xml b/pom.xml
index 325985d..4d02cf9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,8 +90,6 @@
     <mqtt-client-version>1.16</mqtt-client-version>
     <org-apache-derby-version>10.15.2.0</org-apache-derby-version>
     <osgi-version>6.0.0</osgi-version>
-    <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
-    <zookeeper-version>3.4.14</zookeeper-version>
     <qpid-proton-version>0.33.10</qpid-proton-version>
     <qpid-jms-version>1.6.0</qpid-jms-version>
     <netty-version>4.1.75.Final</netty-version>
@@ -229,7 +227,6 @@
     <module>activemq-runtime-config</module>
     <module>activemq-tooling</module>
     <module>activemq-web</module>
-    <module>activemq-partition</module>
     <module>activemq-osgi</module>
     <module>activemq-blueprint</module>
     <module>activemq-web-demo</module>
@@ -316,11 +313,6 @@
         <version>${project.version}</version>
       </dependency>
       <dependency>
-        <groupId>org.apache.activemq</groupId>
-        <artifactId>activemq-partition</artifactId>
-        <version>${project.version}</version>
-      </dependency>
-      <dependency>
         <groupId>org.apache.activemq.tooling</groupId>
         <artifactId>activemq-junit</artifactId>
         <version>${project.version}</version>
@@ -585,65 +577,6 @@
         <version>${pax-logging-version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>org.apache.hadoop.zookeeper</groupId>
-        <artifactId>zookeeper</artifactId>
-        <version>${zookeeper-version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.zookeeper</groupId>
-        <artifactId>zookeeper</artifactId>
-        <version>${zookeeper-version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.linkedin</groupId>
-        <artifactId>org.linkedin.zookeeper-impl</artifactId>
-        <version>${linkedin-zookeeper-version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.json</groupId>
-            <artifactId>json</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.linkedin</groupId>
-        <artifactId>org.linkedin.util-core</artifactId>
-        <version>${linkedin-zookeeper-version}</version>
-      </dependency>
-
       <!-- zeroconf transport -->
       <dependency>
         <groupId>org.jmdns</groupId>