Merge branch 'S4-114' into dev
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index 9c37b4d..c557413 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -31,6 +31,7 @@
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
 import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
+import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
 import org.apache.s4.comm.tcp.RemoteEmitters;
 import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.ClusterFromZK;
@@ -41,6 +42,7 @@
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
@@ -88,11 +90,11 @@
         install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
                 SerializerDeserializerFactory.class));
 
-        bind(Cluster.class).to(ClusterFromZK.class);
+        bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
 
-        bind(Clusters.class).to(ClustersFromZK.class);
+        bind(Clusters.class).to(ClustersFromZK.class).in(Scopes.SINGLETON);
 
-        bind(RemoteEmitters.class);
+        bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
 
         bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
 
@@ -106,14 +108,13 @@
                     .getString("s4.comm.emitter.remote.class"));
             install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
                     RemoteEmitterFactory.class));
-            bind(RemoteEmitters.class);
+            bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
 
         } catch (ClassNotFoundException e) {
             logger.error("Cannot find class implementation ", e);
         }
     }
 
-    @SuppressWarnings("serial")
     private void loadProperties(Binder binder) {
         try {
             config = new PropertiesConfiguration();
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
new file mode 100644
index 0000000..151ad85
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.tcp;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.RemoteEmitterFactory;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class DefaultRemoteEmitters implements RemoteEmitters {
+
+    ConcurrentMap<Cluster, RemoteEmitter> emitters = new ConcurrentHashMap<Cluster, RemoteEmitter>();
+
+    @Inject
+    RemoteEmitterFactory emitterFactory;
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.comm.tcp.RemoteEmitters#getEmitter(org.apache.s4.comm.topology.Cluster)
+     */
+    @Override
+    public RemoteEmitter getEmitter(Cluster topology) {
+        RemoteEmitter emitter = emitters.get(topology);
+        if (emitter == null) {
+            RemoteEmitter newEmitter = emitterFactory.createRemoteEmitter(topology);
+            emitter = emitters.putIfAbsent(topology, newEmitter);
+            if (emitter == null) {
+                emitter = newEmitter;
+            } else {
+                // use the existing emitter instead
+                newEmitter.close();
+            }
+        }
+        return emitter;
+    }
+}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
index 4b3040d..5cfd3d7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -1,57 +1,14 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.s4.comm.tcp;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.comm.RemoteEmitterFactory;
 import org.apache.s4.comm.topology.Cluster;
 
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
 /**
  * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
  * 
  */
-@Singleton
-public class RemoteEmitters {
+public interface RemoteEmitters {
 
-    ConcurrentMap<Cluster, RemoteEmitter> emitters = new ConcurrentHashMap<Cluster, RemoteEmitter>();
+    public abstract RemoteEmitter getEmitter(Cluster topology);
 
-    @Inject
-    RemoteEmitterFactory emitterFactory;
-
-    public RemoteEmitter getEmitter(Cluster topology) {
-        RemoteEmitter emitter = emitters.get(topology);
-        if (emitter == null) {
-            RemoteEmitter newEmitter = emitterFactory.createRemoteEmitter(topology);
-            emitter = emitters.putIfAbsent(topology, newEmitter);
-            if (emitter == null) {
-                emitter = newEmitter;
-            } else {
-                // use the existing emitter instead
-                newEmitter.close();
-            }
-        }
-        return emitter;
-    }
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 0b81854..78d4f69 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -44,7 +44,7 @@
 
 /**
  * Handles partition assignment through Zookeeper.
- *
+ * 
  */
 @Singleton
 public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
@@ -85,15 +85,13 @@
      * Holds the reference to ClusterNode which points to the current partition owned
      */
     AtomicReference<ClusterNode> clusterNodeRef;
-    private int connectionTimeout;
-    private String clusterName;
+    private final int connectionTimeout;
+    private final String clusterName;
 
     // TODO we currently have a single assignment per node (i.e. a node can only belong to 1 topology)
     @Inject
     public AssignmentFromZK(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
         this.clusterName = clusterName;
         this.connectionTimeout = connectionTimeout;
         taskPath = "/s4/clusters/" + clusterName + "/tasks";
@@ -110,9 +108,7 @@
             machineId = "UNKNOWN";
         }
 
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
+        this.zkClient = zkClient;
     }
 
     @Inject
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 6bda6b4..45e0ae4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -30,7 +30,6 @@
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@
 /**
  * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
  * configuration.
- *
+ * 
  */
 public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
 
@@ -53,23 +52,19 @@
     private final String taskPath;
     private final String processPath;
     private final Lock lock;
-    private String clusterName;
+    private final String clusterName;
 
     /**
      * only the local topology
      */
     @Inject
     public ClusterFromZK(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
         this.clusterName = clusterName;
         this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
         this.processPath = "/s4/clusters/" + clusterName + "/process";
         lock = new ReentrantLock();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
+        this.zkClient = zkClient;
         zkClient.subscribeStateChanges(this);
         if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
             throw new Exception("cannot connect to zookeeper");
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index 86f6bb5..b9fde5e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -46,21 +46,17 @@
     private final ZkClient zkClient;
     private final Lock lock;
     private String machineId;
-    private Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
-    private int connectionTimeout;
-    private String clusterName;
+    private final Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
+    private final int connectionTimeout;
+    private final String clusterName;
 
     @Inject
     public ClustersFromZK(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
         this.clusterName = clusterName;
         this.connectionTimeout = connectionTimeout;
         lock = new ReentrantLock();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
+        this.zkClient = zkClient;
         zkClient.subscribeStateChanges(this);
         zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
         try {
@@ -115,6 +111,7 @@
         doProcess();
     }
 
+    @Override
     public Cluster getCluster(String clusterName) {
         return clusters.get(clusterName);
     }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index b1fe6b7..9b9be90 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -1,43 +1,6 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.s4.comm.topology;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
 
 /**
  * <p>
@@ -51,165 +14,12 @@
  * </p>
  * 
  */
-@Singleton
-public class RemoteStreams implements IZkStateListener, IZkChildListener {
 
-    private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
-    private KeeperState state;
-    private final ZkClient zkClient;
-    private final Lock lock;
-    private final static String STREAMS_PATH = "/s4/streams";
-    // by stream name, then "producer"|"consumer" then
-    private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+public interface RemoteStreams {
 
-    public enum StreamType {
-        PRODUCER, CONSUMER;
+    public abstract Set<StreamConsumer> getConsumers(String streamName);
 
-        public String getPath(String streamName) {
-            switch (this) {
-                case PRODUCER:
-                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
-                case CONSUMER:
-                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
-                default:
-                    throw new RuntimeException("Invalid path in enum StreamType");
-            }
-        }
-
-        public String getCollectionName() {
-            switch (this) {
-                case PRODUCER:
-                    return "producers";
-                case CONSUMER:
-                    return "consumers";
-                default:
-                    throw new RuntimeException("Invalid path in enum StreamType");
-            }
-        }
-    }
-
-    @Inject
-    public RemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-
-        lock = new ReentrantLock();
-        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
-        ZkSerializer serializer = new ZNRecordSerializer();
-        zkClient.setZkSerializer(serializer);
-        zkClient.subscribeStateChanges(this);
-        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
-        // bug in zkClient, it does not invoke handleNewSession the first time
-        // it connects
-        this.handleStateChanged(KeeperState.SyncConnected);
-
-        this.handleNewSession();
-
-    }
-
-    public Set<StreamConsumer> getConsumers(String streamName) {
-        if (!streams.containsKey(streamName)) {
-            return Collections.emptySet();
-        } else {
-            return streams.get(streamName).get("consumers");
-        }
-    }
-
-    /**
-     * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
-     */
-    private void doProcess() {
-        lock.lock();
-        try {
-            refreshStreams();
-        } catch (Exception e) {
-            logger.warn("Exception in tryToAcquireTask", e);
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    @Override
-    public void handleStateChanged(KeeperState state) throws Exception {
-        this.state = state;
-        if (state.equals(KeeperState.Expired)) {
-            logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
-            System.exit(1);
-        }
-
-    }
-
-    @Override
-    public void handleNewSession() throws Exception {
-        logger.info("New session:" + zkClient.getSessionId());
-        zkClient.subscribeChildChanges(STREAMS_PATH, this);
-
-        doProcess();
-    }
-
-    @Override
-    public void handleChildChange(String paramString, List<String> paramList) throws Exception {
-        doProcess();
-    }
-
-    private void refreshStreams() {
-        List<String> children = zkClient.getChildren(STREAMS_PATH);
-        for (String streamName : children) {
-            if (!streams.containsKey(streamName)) {
-                logger.info("Detected new stream [{}]", streamName);
-                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
-                zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
-                zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
-                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
-            }
-
-            update(streamName, StreamType.PRODUCER);
-            update(streamName, StreamType.CONSUMER);
-        }
-    }
-
-    private void update(String streamName, StreamType type) {
-        List<String> elements = zkClient.getChildren(type.getPath(streamName));
-        Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
-        for (String element : elements) {
-            ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
-            if (producerData != null) {
-                StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
-                        producerData.getSimpleField("clusterName"));
-                consumers.add(consumer);
-            }
-        }
-        streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
-    }
-
-    public void addOutputStream(String appId, String clusterName, String streamName) {
-        lock.lock();
-        try {
-            logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
-                    clusterName });
-            createStreamPaths(streamName);
-            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
-            producer.putSimpleField("appId", appId);
-            producer.putSimpleField("clusterName", clusterName);
-            try {
-                zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
-            } catch (Throwable e) {
-                logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
-                        new String[] { streamName, appId, clusterName, e.getMessage() });
-            }
-            refreshStreams();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Creates (it they don't exist yet) persistent znodes for producers and consumers of a stream.
-     */
-    private void createStreamPaths(String streamName) {
-        zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
-        zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
-    }
+    public abstract void addOutputStream(String appId, String clusterName, String streamName);
 
     /**
      * Publishes interest in a stream from an application.
@@ -218,25 +28,6 @@
      * @param clusterName
      * @param streamName
      */
-    public void addInputStream(int appId, String clusterName, String streamName) {
-        lock.lock();
-        try {
-            logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
-                    new String[] { streamName, String.valueOf(appId), clusterName });
-            createStreamPaths(streamName);
-            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
-            consumer.putSimpleField("appId", String.valueOf(appId));
-            consumer.putSimpleField("clusterName", clusterName);
-            try {
-                // NOTE: We create 1 sequential znode per consumer node instance
-                zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
-            } catch (Throwable e) {
-                logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
-                        new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
-            }
-            refreshStreams();
-        } finally {
-            lock.unlock();
-        }
-    }
+    public abstract void addInputStream(int appId, String clusterName, String streamName);
+
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
new file mode 100644
index 0000000..0855fc0
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.topology;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+@Singleton
+public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, RemoteStreams {
+
+    private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
+    private KeeperState state;
+    private final ZkClient zkClient;
+    private final Lock lock;
+    private final static String STREAMS_PATH = "/s4/streams";
+    // by stream name, then "producer"|"consumer" then
+    private final Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+
+    public enum StreamType {
+        PRODUCER, CONSUMER;
+
+        public String getPath(String streamName) {
+            switch (this) {
+                case PRODUCER:
+                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+                case CONSUMER:
+                    return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+                default:
+                    throw new RuntimeException("Invalid path in enum StreamType");
+            }
+        }
+
+        public String getCollectionName() {
+            switch (this) {
+                case PRODUCER:
+                    return "producers";
+                case CONSUMER:
+                    return "consumers";
+                default:
+                    throw new RuntimeException("Invalid path in enum StreamType");
+            }
+        }
+    }
+
+    @Inject
+    public ZkRemoteStreams(@Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient)
+            throws Exception {
+
+        lock = new ReentrantLock();
+        this.zkClient = zkClient;
+        zkClient.subscribeStateChanges(this);
+        zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+        // bug in zkClient, it does not invoke handleNewSession the first time
+        // it connects
+        this.handleStateChanged(KeeperState.SyncConnected);
+
+        this.handleNewSession();
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.comm.topology.RemoteStreams#getConsumers(java.lang.String)
+     */
+    @Override
+    public Set<StreamConsumer> getConsumers(String streamName) {
+        if (!streams.containsKey(streamName)) {
+            return Collections.emptySet();
+        } else {
+            return streams.get(streamName).get("consumers");
+        }
+    }
+
+    /**
+     * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
+     */
+    private void doProcess() {
+        lock.lock();
+        try {
+            refreshStreams();
+        } catch (Exception e) {
+            logger.warn("Exception in tryToAcquireTask", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        this.state = state;
+        if (state.equals(KeeperState.Expired)) {
+            logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+            System.exit(1);
+        }
+
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+        logger.info("New session:" + zkClient.getSessionId());
+        zkClient.subscribeChildChanges(STREAMS_PATH, this);
+
+        doProcess();
+    }
+
+    @Override
+    public void handleChildChange(String paramString, List<String> paramList) throws Exception {
+        doProcess();
+    }
+
+    private void refreshStreams() {
+        List<String> children = zkClient.getChildren(STREAMS_PATH);
+        for (String streamName : children) {
+            if (!streams.containsKey(streamName)) {
+                logger.info("Detected new stream [{}]", streamName);
+                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+                zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
+                zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
+                streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+            }
+
+            update(streamName, StreamType.PRODUCER);
+            update(streamName, StreamType.CONSUMER);
+        }
+    }
+
+    private void update(String streamName, StreamType type) {
+        List<String> elements = zkClient.getChildren(type.getPath(streamName));
+        Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
+        for (String element : elements) {
+            ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
+            if (producerData != null) {
+                StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
+                        producerData.getSimpleField("clusterName"));
+                consumers.add(consumer);
+            }
+        }
+        streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String,
+     * java.lang.String)
+     */
+    @Override
+    public void addOutputStream(String appId, String clusterName, String streamName) {
+        lock.lock();
+        try {
+            logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
+                    clusterName });
+            createStreamPaths(streamName);
+            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+            producer.putSimpleField("appId", appId);
+            producer.putSimpleField("clusterName", clusterName);
+            try {
+                zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
+            } catch (Throwable e) {
+                logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+                        new String[] { streamName, appId, clusterName, e.getMessage() });
+            }
+            refreshStreams();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Creates (it they don't exist yet) persistent znodes for producers and consumers of a stream.
+     */
+    private void createStreamPaths(String streamName) {
+        zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
+        zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
+     */
+    @Override
+    public void addInputStream(int appId, String clusterName, String streamName) {
+        lock.lock();
+        try {
+            logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
+                    new String[] { streamName, String.valueOf(appId), clusterName });
+            createStreamPaths(streamName);
+            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+            consumer.putSimpleField("appId", String.valueOf(appId));
+            consumer.putSimpleField("clusterName", clusterName);
+            try {
+                // NOTE: We create 1 sequential znode per consumer node instance
+                zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
+            } catch (Throwable e) {
+                logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+                        new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
+            }
+            refreshStreams();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
index 8b1ff9a..472e912 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -64,7 +64,9 @@
                     try {
 
                         for (String topologyName : names) {
-                            assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+                            zkClient.setZkSerializer(new ZNRecordSerializer());
+                            assignmentFromZK = new AssignmentFromZK(topologyName, 30000, zkClient);
                             assignmentFromZK.init();
                             ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
                             latch.countDown();
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
index f17bf26..8f692bc 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -56,11 +56,13 @@
             InterruptedException {
         final Set<String> clusterNames = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(clustersString));
         taskSetup.clean("s4");
+
         for (String clusterName : clusterNames) {
             taskSetup.setup(clusterName, 10, 1300);
         }
-
-        final ClustersFromZK clusterFromZK = new ClustersFromZK(null, CommTestUtils.ZK_STRING, 30000, 30000);
+        ZkClient zkClient1 = new ZkClient(CommTestUtils.ZK_STRING);
+        zkClient1.setZkSerializer(new ZNRecordSerializer());
+        final ClustersFromZK clusterFromZK = new ClustersFromZK(null, 30000, zkClient1);
 
         final CountDownLatch signalAllClustersComplete = new CountDownLatch(clusterNames.size());
         for (final String clusterName : clusterNames) {
@@ -87,7 +89,9 @@
                     AssignmentFromZK assignmentFromZK;
                     try {
                         for (String clusterName : clusterNames) {
-                            assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+                            ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+                            zkClient.setZkSerializer(new ZNRecordSerializer());
+                            assignmentFromZK = new AssignmentFromZK(clusterName, 30000, zkClient);
                             assignmentFromZK.init();
                             ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
                             latch.countDown();
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
index c1fb253..827ec42 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -5,6 +5,8 @@
 import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
 
@@ -31,8 +33,11 @@
         bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_session_timeout")).toInstance(10000);
         bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
         bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
-        // bind(Cluster.class).to(ClusterFromZK.class);
 
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+
+        ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        bind(ZkClient.class).toInstance(zkClient);
     }
 }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index df2d8f1..582f5d0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -8,6 +8,7 @@
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.s4.comm.topology.Assignment;
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.s4.comm.util.ArchiveFetcher;
 import org.apache.s4.comm.util.RemoteFileFetcher;
 import org.slf4j.Logger;
@@ -15,6 +16,7 @@
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
+import com.google.inject.Scopes;
 import com.google.inject.name.Names;
 
 public class BaseModule extends AbstractModule {
@@ -45,6 +47,9 @@
         bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
         bind(S4Bootstrap.class);
 
+        // share the Zookeeper connection
+        bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+
     }
 
     @SuppressWarnings("serial")
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 7789b9f..c4cafee 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -28,6 +28,8 @@
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZkRemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.ft.NoOpCheckpointingFramework;
 import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
@@ -45,6 +47,7 @@
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
 import com.google.inject.Provides;
+import com.google.inject.Scopes;
 import com.google.inject.name.Named;
 import com.google.inject.name.Names;
 
@@ -81,7 +84,7 @@
         /* The hashing function to map keys top partitions. */
         bind(Hasher.class).to(DefaultHasher.class);
 
-        bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+        bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
 
         bind(S4RLoaderFactory.class);
 
@@ -95,6 +98,9 @@
 
         bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
 
+        bind(RemoteStreams.class).to(ZkRemoteStreams.class).in(Scopes.SINGLETON);
+        bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+
     }
 
     @Provides
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
new file mode 100644
index 0000000..6aaa8f1
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+public class DefaultRemoteSenders implements RemoteSenders {
+
+    Logger logger = LoggerFactory.getLogger(DefaultRemoteSenders.class);
+
+    final RemoteEmitters remoteEmitters;
+
+    final RemoteStreams remoteStreams;
+
+    final Clusters remoteClusters;
+
+    final SerializerDeserializer serDeser;
+
+    final Hasher hasher;
+
+    ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
+
+    private final ExecutorService executorService;
+
+    @Inject
+    public DefaultRemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
+            SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+            RemoteSendersExecutorServiceFactory senderExecutorFactory) {
+        this.remoteEmitters = remoteEmitters;
+        this.remoteStreams = remoteStreams;
+        this.remoteClusters = remoteClusters;
+        this.hasher = hasher;
+        executorService = senderExecutorFactory.create();
+
+        serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.RemoteSenders#send(java.lang.String, org.apache.s4.base.Event)
+     */
+    @Override
+    public void send(String hashKey, Event event) {
+
+        Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
+        event.setAppId(-1);
+        for (StreamConsumer consumer : consumers) {
+            // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
+            // represented by a single stream consumer
+            RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
+            if (sender == null) {
+                RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
+                        .getClusterName())), hasher, consumer.getClusterName());
+                // TODO cleanup when remote topologies die
+                sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
+                if (sender == null) {
+                    sender = newSender;
+                }
+            }
+            // NOTE: this implies multiple serializations, there might be an optimization
+            executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
+        }
+    }
+
+    class SendToRemoteClusterTask implements Runnable {
+
+        String hashKey;
+        Event event;
+        RemoteSender sender;
+
+        public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender) {
+            super();
+            this.hashKey = hashKey;
+            this.event = event;
+            this.sender = sender;
+        }
+
+        @Override
+        public void run() {
+            try {
+                sender.send(hashKey, serDeser.serialize(event));
+            } catch (InterruptedException e) {
+                logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
+                Thread.currentThread().interrupt();
+            }
+
+        }
+
+    }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 7c7238f..9222450 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -1,123 +1,14 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.s4.core;
 
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-
 import org.apache.s4.base.Event;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Clusters;
-import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.comm.topology.StreamConsumer;
-import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
 
 /**
  * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
  * the event.
  * 
  */
-public class RemoteSenders {
+public interface RemoteSenders {
 
-    Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
+    public abstract void send(String hashKey, Event event);
 
-    final RemoteEmitters remoteEmitters;
-
-    final RemoteStreams remoteStreams;
-
-    final Clusters remoteClusters;
-
-    final SerializerDeserializer serDeser;
-
-    final Hasher hasher;
-
-    ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
-
-    private final ExecutorService executorService;
-
-    @Inject
-    public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
-            SerializerDeserializerFactory serDeserFactory, Hasher hasher,
-            RemoteSendersExecutorServiceFactory senderExecutorFactory) {
-        this.remoteEmitters = remoteEmitters;
-        this.remoteStreams = remoteStreams;
-        this.remoteClusters = remoteClusters;
-        this.hasher = hasher;
-        executorService = senderExecutorFactory.create();
-
-        serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
-    }
-
-    public void send(String hashKey, Event event) {
-
-        Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
-        event.setAppId(-1);
-        for (StreamConsumer consumer : consumers) {
-            // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
-            // represented by a single stream consumer
-            RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
-            if (sender == null) {
-                RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
-                        .getClusterName())), hasher, consumer.getClusterName());
-                // TODO cleanup when remote topologies die
-                sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
-                if (sender == null) {
-                    sender = newSender;
-                }
-            }
-            // NOTE: this implies multiple serializations, there might be an optimization
-            executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
-        }
-    }
-
-    class SendToRemoteClusterTask implements Runnable {
-
-        String hashKey;
-        Event event;
-        RemoteSender sender;
-
-        public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender) {
-            super();
-            this.hashKey = hashKey;
-            this.event = event;
-            this.sender = sender;
-        }
-
-        @Override
-        public void run() {
-            try {
-                sender.send(hashKey, serDeser.serialize(event));
-            } catch (InterruptedException e) {
-                logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
-                Thread.currentThread().interrupt();
-            }
-
-        }
-
-    }
 }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
new file mode 100644
index 0000000..19c4ecc
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
@@ -0,0 +1,37 @@
+package org.apache.s4.core;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.name.Named;
+
+/**
+ * 
+ * Provides a connection to ZooKeeper through the {@link ZkClient} class.
+ * <p>
+ * This connection can easily be shared by specifying singleton scope at binding time (i.e. when binding the ZkClient
+ * class, see {@link BaseModule}).
+ * 
+ * 
+ */
+public class ZkClientProvider implements Provider<ZkClient> {
+
+    private final ZkClient zkClient;
+
+    @Inject
+    public ZkClientProvider(@Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) {
+        zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+        ZkSerializer serializer = new ZNRecordSerializer();
+        zkClient.setZkSerializer(serializer);
+    }
+
+    @Override
+    public ZkClient get() {
+        return zkClient;
+    }
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
index e8b7e87..bcf88c4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
@@ -19,6 +19,7 @@
 package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 import com.google.inject.Inject;
@@ -28,10 +29,8 @@
 
     @Inject
     public AssignmentFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+        super(clusterName, connectionTimeout, zkClient);
     }
 
     @Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
index 025fb75..0b48bbf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
@@ -19,6 +19,7 @@
 package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 import com.google.inject.Inject;
@@ -28,10 +29,8 @@
 
     @Inject
     public ClusterFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+        super(clusterName, connectionTimeout, zkClient);
     }
 
     @Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
index 4ea5644..0038b21 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
@@ -19,6 +19,7 @@
 package org.apache.s4.fixtures;
 
 import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.s4.comm.topology.ZkClient;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 
 import com.google.inject.Inject;
@@ -28,10 +29,8 @@
 
     @Inject
     public ClustersFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
-            @Named("s4.cluster.zk_address") String zookeeperAddress,
-            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+        super(clusterName, connectionTimeout, zkClient);
     }
 
     @Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index 1dadc5b..c4f1772 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -32,8 +32,6 @@
 import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.PhysicalCluster;
-import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.RemoteSenders;
 import org.mockito.Mockito;
 
 import com.google.common.collect.ImmutableMap;
@@ -58,8 +56,6 @@
         /* Use Kryo to serialize events. */
         install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
                 SerializerDeserializerFactory.class));
-        bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
-        bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
         bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
         bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
         Cluster mockedCluster = Mockito.mock(Cluster.class);
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index ae19011..7900ecc 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,12 +20,15 @@
 
 import org.apache.s4.comm.DeserializerExecutorFactory;
 import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.core.RemoteSenders;
 import org.apache.s4.core.staging.BlockingSenderExecutorServiceFactory;
 import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
 import org.apache.s4.core.staging.SenderExecutorServiceFactory;
 import org.apache.s4.core.staging.StreamExecutorServiceFactory;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +60,9 @@
         bind(SenderExecutorServiceFactory.class).to(BlockingSenderExecutorServiceFactory.class);
         bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
 
+        bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
+        bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
+
         bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
         bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);