Merge branch 'S4-114' into dev
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index 9c37b4d..c557413 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -31,6 +31,7 @@
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
import org.apache.s4.comm.staging.BlockingDeserializerExecutorFactory;
+import org.apache.s4.comm.tcp.DefaultRemoteEmitters;
import org.apache.s4.comm.tcp.RemoteEmitters;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.ClusterFromZK;
@@ -41,6 +42,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
+import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
@@ -88,11 +90,11 @@
install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
SerializerDeserializerFactory.class));
- bind(Cluster.class).to(ClusterFromZK.class);
+ bind(Cluster.class).to(ClusterFromZK.class).in(Scopes.SINGLETON);
- bind(Clusters.class).to(ClustersFromZK.class);
+ bind(Clusters.class).to(ClustersFromZK.class).in(Scopes.SINGLETON);
- bind(RemoteEmitters.class);
+ bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
bind(DeserializerExecutorFactory.class).to(BlockingDeserializerExecutorFactory.class);
@@ -106,14 +108,13 @@
.getString("s4.comm.emitter.remote.class"));
install(new FactoryModuleBuilder().implement(RemoteEmitter.class, remoteEmitterClass).build(
RemoteEmitterFactory.class));
- bind(RemoteEmitters.class);
+ bind(RemoteEmitters.class).to(DefaultRemoteEmitters.class).in(Scopes.SINGLETON);
} catch (ClassNotFoundException e) {
logger.error("Cannot find class implementation ", e);
}
}
- @SuppressWarnings("serial")
private void loadProperties(Binder binder) {
try {
config = new PropertiesConfiguration();
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
new file mode 100644
index 0000000..151ad85
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.tcp;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.s4.base.RemoteEmitter;
+import org.apache.s4.comm.RemoteEmitterFactory;
+import org.apache.s4.comm.topology.Cluster;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class DefaultRemoteEmitters implements RemoteEmitters {
+
+ ConcurrentMap<Cluster, RemoteEmitter> emitters = new ConcurrentHashMap<Cluster, RemoteEmitter>();
+
+ @Inject
+ RemoteEmitterFactory emitterFactory;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.comm.tcp.RemoteEmitters#getEmitter(org.apache.s4.comm.topology.Cluster)
+ */
+ @Override
+ public RemoteEmitter getEmitter(Cluster topology) {
+ RemoteEmitter emitter = emitters.get(topology);
+ if (emitter == null) {
+ RemoteEmitter newEmitter = emitterFactory.createRemoteEmitter(topology);
+ emitter = emitters.putIfAbsent(topology, newEmitter);
+ if (emitter == null) {
+ emitter = newEmitter;
+ } else {
+ // use the existing emitter instead
+ newEmitter.close();
+ }
+ }
+ return emitter;
+ }
+}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
index 4b3040d..5cfd3d7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
@@ -1,57 +1,14 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.s4.comm.tcp;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
import org.apache.s4.base.RemoteEmitter;
-import org.apache.s4.comm.RemoteEmitterFactory;
import org.apache.s4.comm.topology.Cluster;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
/**
* Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
*
*/
-@Singleton
-public class RemoteEmitters {
+public interface RemoteEmitters {
- ConcurrentMap<Cluster, RemoteEmitter> emitters = new ConcurrentHashMap<Cluster, RemoteEmitter>();
+ public abstract RemoteEmitter getEmitter(Cluster topology);
- @Inject
- RemoteEmitterFactory emitterFactory;
-
- public RemoteEmitter getEmitter(Cluster topology) {
- RemoteEmitter emitter = emitters.get(topology);
- if (emitter == null) {
- RemoteEmitter newEmitter = emitterFactory.createRemoteEmitter(topology);
- emitter = emitters.putIfAbsent(topology, newEmitter);
- if (emitter == null) {
- emitter = newEmitter;
- } else {
- // use the existing emitter instead
- newEmitter.close();
- }
- }
- return emitter;
- }
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 0b81854..78d4f69 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -44,7 +44,7 @@
/**
* Handles partition assignment through Zookeeper.
- *
+ *
*/
@Singleton
public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
@@ -85,15 +85,13 @@
* Holds the reference to ClusterNode which points to the current partition owned
*/
AtomicReference<ClusterNode> clusterNodeRef;
- private int connectionTimeout;
- private String clusterName;
+ private final int connectionTimeout;
+ private final String clusterName;
// TODO we currently have a single assignment per node (i.e. a node can only belong to 1 topology)
@Inject
public AssignmentFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.connectionTimeout = connectionTimeout;
taskPath = "/s4/clusters/" + clusterName + "/tasks";
@@ -110,9 +108,7 @@
machineId = "UNKNOWN";
}
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
}
@Inject
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 6bda6b4..45e0ae4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -30,7 +30,6 @@
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@
/**
* Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
* configuration.
- *
+ *
*/
public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
@@ -53,23 +52,19 @@
private final String taskPath;
private final String processPath;
private final Lock lock;
- private String clusterName;
+ private final String clusterName;
/**
* only the local topology
*/
@Inject
public ClusterFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
this.processPath = "/s4/clusters/" + clusterName + "/process";
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
throw new Exception("cannot connect to zookeeper");
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index 86f6bb5..b9fde5e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -46,21 +46,17 @@
private final ZkClient zkClient;
private final Lock lock;
private String machineId;
- private Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
- private int connectionTimeout;
- private String clusterName;
+ private final Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
+ private final int connectionTimeout;
+ private final String clusterName;
@Inject
public ClustersFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.connectionTimeout = connectionTimeout;
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
try {
@@ -115,6 +111,7 @@
doProcess();
}
+ @Override
public Cluster getCluster(String clusterName) {
return clusters.get(clusterName);
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index b1fe6b7..9b9be90 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -1,43 +1,6 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.s4.comm.topology;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.google.inject.name.Named;
/**
* <p>
@@ -51,165 +14,12 @@
* </p>
*
*/
-@Singleton
-public class RemoteStreams implements IZkStateListener, IZkChildListener {
- private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
- private KeeperState state;
- private final ZkClient zkClient;
- private final Lock lock;
- private final static String STREAMS_PATH = "/s4/streams";
- // by stream name, then "producer"|"consumer" then
- private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+public interface RemoteStreams {
- public enum StreamType {
- PRODUCER, CONSUMER;
+ public abstract Set<StreamConsumer> getConsumers(String streamName);
- public String getPath(String streamName) {
- switch (this) {
- case PRODUCER:
- return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
- case CONSUMER:
- return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
- default:
- throw new RuntimeException("Invalid path in enum StreamType");
- }
- }
-
- public String getCollectionName() {
- switch (this) {
- case PRODUCER:
- return "producers";
- case CONSUMER:
- return "consumers";
- default:
- throw new RuntimeException("Invalid path in enum StreamType");
- }
- }
- }
-
- @Inject
- public RemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
-
- lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
- zkClient.subscribeStateChanges(this);
- zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
- // bug in zkClient, it does not invoke handleNewSession the first time
- // it connects
- this.handleStateChanged(KeeperState.SyncConnected);
-
- this.handleNewSession();
-
- }
-
- public Set<StreamConsumer> getConsumers(String streamName) {
- if (!streams.containsKey(streamName)) {
- return Collections.emptySet();
- } else {
- return streams.get(streamName).get("consumers");
- }
- }
-
- /**
- * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
- */
- private void doProcess() {
- lock.lock();
- try {
- refreshStreams();
- } catch (Exception e) {
- logger.warn("Exception in tryToAcquireTask", e);
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- this.state = state;
- if (state.equals(KeeperState.Expired)) {
- logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
- System.exit(1);
- }
-
- }
-
- @Override
- public void handleNewSession() throws Exception {
- logger.info("New session:" + zkClient.getSessionId());
- zkClient.subscribeChildChanges(STREAMS_PATH, this);
-
- doProcess();
- }
-
- @Override
- public void handleChildChange(String paramString, List<String> paramList) throws Exception {
- doProcess();
- }
-
- private void refreshStreams() {
- List<String> children = zkClient.getChildren(STREAMS_PATH);
- for (String streamName : children) {
- if (!streams.containsKey(streamName)) {
- logger.info("Detected new stream [{}]", streamName);
- streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
- zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
- zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
- streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
- }
-
- update(streamName, StreamType.PRODUCER);
- update(streamName, StreamType.CONSUMER);
- }
- }
-
- private void update(String streamName, StreamType type) {
- List<String> elements = zkClient.getChildren(type.getPath(streamName));
- Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
- for (String element : elements) {
- ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
- if (producerData != null) {
- StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
- producerData.getSimpleField("clusterName"));
- consumers.add(consumer);
- }
- }
- streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
- }
-
- public void addOutputStream(String appId, String clusterName, String streamName) {
- lock.lock();
- try {
- logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
- clusterName });
- createStreamPaths(streamName);
- ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
- producer.putSimpleField("appId", appId);
- producer.putSimpleField("clusterName", clusterName);
- try {
- zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
- } catch (Throwable e) {
- logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
- new String[] { streamName, appId, clusterName, e.getMessage() });
- }
- refreshStreams();
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Creates (it they don't exist yet) persistent znodes for producers and consumers of a stream.
- */
- private void createStreamPaths(String streamName) {
- zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
- zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
- }
+ public abstract void addOutputStream(String appId, String clusterName, String streamName);
/**
* Publishes interest in a stream from an application.
@@ -218,25 +28,6 @@
* @param clusterName
* @param streamName
*/
- public void addInputStream(int appId, String clusterName, String streamName) {
- lock.lock();
- try {
- logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
- new String[] { streamName, String.valueOf(appId), clusterName });
- createStreamPaths(streamName);
- ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
- consumer.putSimpleField("appId", String.valueOf(appId));
- consumer.putSimpleField("clusterName", clusterName);
- try {
- // NOTE: We create 1 sequential znode per consumer node instance
- zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
- } catch (Throwable e) {
- logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
- new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
- }
- refreshStreams();
- } finally {
- lock.unlock();
- }
- }
+ public abstract void addInputStream(int appId, String clusterName, String streamName);
+
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
new file mode 100644
index 0000000..0855fc0
--- /dev/null
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.comm.topology;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.google.inject.name.Named;
+
+@Singleton
+public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, RemoteStreams {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClustersFromZK.class);
+ private KeeperState state;
+ private final ZkClient zkClient;
+ private final Lock lock;
+ private final static String STREAMS_PATH = "/s4/streams";
+ // by stream name, then "producer"|"consumer" then
+ private final Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+
+ public enum StreamType {
+ PRODUCER, CONSUMER;
+
+ public String getPath(String streamName) {
+ switch (this) {
+ case PRODUCER:
+ return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+ case CONSUMER:
+ return STREAMS_PATH + "/" + streamName + "/" + getCollectionName();
+ default:
+ throw new RuntimeException("Invalid path in enum StreamType");
+ }
+ }
+
+ public String getCollectionName() {
+ switch (this) {
+ case PRODUCER:
+ return "producers";
+ case CONSUMER:
+ return "consumers";
+ default:
+ throw new RuntimeException("Invalid path in enum StreamType");
+ }
+ }
+ }
+
+ @Inject
+ public ZkRemoteStreams(@Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient)
+ throws Exception {
+
+ lock = new ReentrantLock();
+ this.zkClient = zkClient;
+ zkClient.subscribeStateChanges(this);
+ zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
+ // bug in zkClient, it does not invoke handleNewSession the first time
+ // it connects
+ this.handleStateChanged(KeeperState.SyncConnected);
+
+ this.handleNewSession();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.comm.topology.RemoteStreams#getConsumers(java.lang.String)
+ */
+ @Override
+ public Set<StreamConsumer> getConsumers(String streamName) {
+ if (!streams.containsKey(streamName)) {
+ return Collections.emptySet();
+ } else {
+ return streams.get(streamName).get("consumers");
+ }
+ }
+
+ /**
+ * One method to do any processing if there is a change in ZK, all callbacks will be processed sequentially
+ */
+ private void doProcess() {
+ lock.lock();
+ try {
+ refreshStreams();
+ } catch (Exception e) {
+ logger.warn("Exception in tryToAcquireTask", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ this.state = state;
+ if (state.equals(KeeperState.Expired)) {
+ logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+ System.exit(1);
+ }
+
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ logger.info("New session:" + zkClient.getSessionId());
+ zkClient.subscribeChildChanges(STREAMS_PATH, this);
+
+ doProcess();
+ }
+
+ @Override
+ public void handleChildChange(String paramString, List<String> paramList) throws Exception {
+ doProcess();
+ }
+
+ private void refreshStreams() {
+ List<String> children = zkClient.getChildren(STREAMS_PATH);
+ for (String streamName : children) {
+ if (!streams.containsKey(streamName)) {
+ logger.info("Detected new stream [{}]", streamName);
+ streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+ zkClient.subscribeChildChanges(StreamType.PRODUCER.getPath(streamName), this);
+ zkClient.subscribeChildChanges(StreamType.CONSUMER.getPath(streamName), this);
+ streams.put(streamName, new HashMap<String, Set<StreamConsumer>>());
+ }
+
+ update(streamName, StreamType.PRODUCER);
+ update(streamName, StreamType.CONSUMER);
+ }
+ }
+
+ private void update(String streamName, StreamType type) {
+ List<String> elements = zkClient.getChildren(type.getPath(streamName));
+ Set<StreamConsumer> consumers = new HashSet<StreamConsumer>();
+ for (String element : elements) {
+ ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
+ if (producerData != null) {
+ StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
+ producerData.getSimpleField("clusterName"));
+ consumers.add(consumer);
+ }
+ }
+ streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String,
+ * java.lang.String)
+ */
+ @Override
+ public void addOutputStream(String appId, String clusterName, String streamName) {
+ lock.lock();
+ try {
+ logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
+ clusterName });
+ createStreamPaths(streamName);
+ ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+ producer.putSimpleField("appId", appId);
+ producer.putSimpleField("clusterName", clusterName);
+ try {
+ zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
+ } catch (Throwable e) {
+ logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+ new String[] { streamName, appId, clusterName, e.getMessage() });
+ }
+ refreshStreams();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Creates (it they don't exist yet) persistent znodes for producers and consumers of a stream.
+ */
+ private void createStreamPaths(String streamName) {
+ zkClient.createPersistent(StreamType.PRODUCER.getPath(streamName), true);
+ zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
+ */
+ @Override
+ public void addInputStream(int appId, String clusterName, String streamName) {
+ lock.lock();
+ try {
+ logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
+ new String[] { streamName, String.valueOf(appId), clusterName });
+ createStreamPaths(streamName);
+ ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
+ consumer.putSimpleField("appId", String.valueOf(appId));
+ consumer.putSimpleField("clusterName", clusterName);
+ try {
+ // NOTE: We create 1 sequential znode per consumer node instance
+ zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
+ } catch (Throwable e) {
+ logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
+ new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
+ }
+ refreshStreams();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
index 8b1ff9a..472e912 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -64,7 +64,9 @@
try {
for (String topologyName : names) {
- assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ assignmentFromZK = new AssignmentFromZK(topologyName, 30000, zkClient);
assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
index f17bf26..8f692bc 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
@@ -56,11 +56,13 @@
InterruptedException {
final Set<String> clusterNames = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(clustersString));
taskSetup.clean("s4");
+
for (String clusterName : clusterNames) {
taskSetup.setup(clusterName, 10, 1300);
}
-
- final ClustersFromZK clusterFromZK = new ClustersFromZK(null, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient1 = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient1.setZkSerializer(new ZNRecordSerializer());
+ final ClustersFromZK clusterFromZK = new ClustersFromZK(null, 30000, zkClient1);
final CountDownLatch signalAllClustersComplete = new CountDownLatch(clusterNames.size());
for (final String clusterName : clusterNames) {
@@ -87,7 +89,9 @@
AssignmentFromZK assignmentFromZK;
try {
for (String clusterName : clusterNames) {
- assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ assignmentFromZK = new AssignmentFromZK(clusterName, 30000, zkClient);
assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
index c1fb253..827ec42 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
@@ -5,6 +5,8 @@
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
@@ -31,8 +33,11 @@
bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_session_timeout")).toInstance(10000);
bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
- // bind(Cluster.class).to(ClusterFromZK.class);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ bind(ZkClient.class).toInstance(zkClient);
}
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index df2d8f1..582f5d0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -8,6 +8,7 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
import org.slf4j.Logger;
@@ -15,6 +16,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
+import com.google.inject.Scopes;
import com.google.inject.name.Names;
public class BaseModule extends AbstractModule {
@@ -45,6 +47,9 @@
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(S4Bootstrap.class);
+ // share the Zookeeper connection
+ bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+
}
@SuppressWarnings("serial")
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 7789b9f..c4cafee 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -28,6 +28,8 @@
import org.apache.s4.base.Hasher;
import org.apache.s4.base.util.S4RLoaderFactory;
import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.ZkRemoteStreams;
import org.apache.s4.core.ft.CheckpointingFramework;
import org.apache.s4.core.ft.NoOpCheckpointingFramework;
import org.apache.s4.core.staging.BlockingRemoteSendersExecutorServiceFactory;
@@ -45,6 +47,7 @@
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Provides;
+import com.google.inject.Scopes;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
@@ -81,7 +84,7 @@
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);
- bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
+ bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
bind(S4RLoaderFactory.class);
@@ -95,6 +98,9 @@
bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);
+ bind(RemoteStreams.class).to(ZkRemoteStreams.class).in(Scopes.SINGLETON);
+ bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+
}
@Provides
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
new file mode 100644
index 0000000..6aaa8f1
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+
+public class DefaultRemoteSenders implements RemoteSenders {
+
+ Logger logger = LoggerFactory.getLogger(DefaultRemoteSenders.class);
+
+ final RemoteEmitters remoteEmitters;
+
+ final RemoteStreams remoteStreams;
+
+ final Clusters remoteClusters;
+
+ final SerializerDeserializer serDeser;
+
+ final Hasher hasher;
+
+ ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
+
+ private final ExecutorService executorService;
+
+ @Inject
+ public DefaultRemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
+ SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+ RemoteSendersExecutorServiceFactory senderExecutorFactory) {
+ this.remoteEmitters = remoteEmitters;
+ this.remoteStreams = remoteStreams;
+ this.remoteClusters = remoteClusters;
+ this.hasher = hasher;
+ executorService = senderExecutorFactory.create();
+
+ serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.RemoteSenders#send(java.lang.String, org.apache.s4.base.Event)
+ */
+ @Override
+ public void send(String hashKey, Event event) {
+
+ Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
+ event.setAppId(-1);
+ for (StreamConsumer consumer : consumers) {
+ // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
+ // represented by a single stream consumer
+ RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
+ if (sender == null) {
+ RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
+ .getClusterName())), hasher, consumer.getClusterName());
+ // TODO cleanup when remote topologies die
+ sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
+ if (sender == null) {
+ sender = newSender;
+ }
+ }
+ // NOTE: this implies multiple serializations, there might be an optimization
+ executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
+ }
+ }
+
+ class SendToRemoteClusterTask implements Runnable {
+
+ String hashKey;
+ Event event;
+ RemoteSender sender;
+
+ public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender) {
+ super();
+ this.hashKey = hashKey;
+ this.event = event;
+ this.sender = sender;
+ }
+
+ @Override
+ public void run() {
+ try {
+ sender.send(hashKey, serDeser.serialize(event));
+ } catch (InterruptedException e) {
+ logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ }
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index 7c7238f..9222450 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -1,123 +1,14 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.s4.core;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-
import org.apache.s4.base.Event;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
-import org.apache.s4.comm.tcp.RemoteEmitters;
-import org.apache.s4.comm.topology.Clusters;
-import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.comm.topology.StreamConsumer;
-import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
/**
* Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
* the event.
*
*/
-public class RemoteSenders {
+public interface RemoteSenders {
- Logger logger = LoggerFactory.getLogger(RemoteSenders.class);
+ public abstract void send(String hashKey, Event event);
- final RemoteEmitters remoteEmitters;
-
- final RemoteStreams remoteStreams;
-
- final Clusters remoteClusters;
-
- final SerializerDeserializer serDeser;
-
- final Hasher hasher;
-
- ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
-
- private final ExecutorService executorService;
-
- @Inject
- public RemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteStreams, Clusters remoteClusters,
- SerializerDeserializerFactory serDeserFactory, Hasher hasher,
- RemoteSendersExecutorServiceFactory senderExecutorFactory) {
- this.remoteEmitters = remoteEmitters;
- this.remoteStreams = remoteStreams;
- this.remoteClusters = remoteClusters;
- this.hasher = hasher;
- executorService = senderExecutorFactory.create();
-
- serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
- }
-
- public void send(String hashKey, Event event) {
-
- Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
- event.setAppId(-1);
- for (StreamConsumer consumer : consumers) {
- // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
- // represented by a single stream consumer
- RemoteSender sender = sendersByTopology.get(consumer.getClusterName());
- if (sender == null) {
- RemoteSender newSender = new RemoteSender(remoteEmitters.getEmitter(remoteClusters.getCluster(consumer
- .getClusterName())), hasher, consumer.getClusterName());
- // TODO cleanup when remote topologies die
- sender = sendersByTopology.putIfAbsent(consumer.getClusterName(), newSender);
- if (sender == null) {
- sender = newSender;
- }
- }
- // NOTE: this implies multiple serializations, there might be an optimization
- executorService.execute(new SendToRemoteClusterTask(hashKey, event, sender));
- }
- }
-
- class SendToRemoteClusterTask implements Runnable {
-
- String hashKey;
- Event event;
- RemoteSender sender;
-
- public SendToRemoteClusterTask(String hashKey, Event event, RemoteSender sender) {
- super();
- this.hashKey = hashKey;
- this.event = event;
- this.sender = sender;
- }
-
- @Override
- public void run() {
- try {
- sender.send(hashKey, serDeser.serialize(event));
- } catch (InterruptedException e) {
- logger.error("Interrupted blocking send operation for event {}. Event is lost.", event);
- Thread.currentThread().interrupt();
- }
-
- }
-
- }
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
new file mode 100644
index 0000000..19c4ecc
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
@@ -0,0 +1,37 @@
+package org.apache.s4.core;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.name.Named;
+
+/**
+ *
+ * Provides a connection to ZooKeeper through the {@link ZkClient} class.
+ * <p>
+ * This connection can easily be shared by specifying singleton scope at binding time (i.e. when binding the ZkClient
+ * class, see {@link BaseModule}).
+ *
+ *
+ */
+public class ZkClientProvider implements Provider<ZkClient> {
+
+ private final ZkClient zkClient;
+
+ @Inject
+ public ZkClientProvider(@Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) {
+ zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
+ ZkSerializer serializer = new ZNRecordSerializer();
+ zkClient.setZkSerializer(serializer);
+ }
+
+ @Override
+ public ZkClient get() {
+ return zkClient;
+ }
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
index e8b7e87..bcf88c4 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
@@ -19,6 +19,7 @@
package org.apache.s4.fixtures;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import com.google.inject.Inject;
@@ -28,10 +29,8 @@
@Inject
public AssignmentFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+ super(clusterName, connectionTimeout, zkClient);
}
@Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
index 025fb75..0b48bbf 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
@@ -19,6 +19,7 @@
package org.apache.s4.fixtures;
import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import com.google.inject.Inject;
@@ -28,10 +29,8 @@
@Inject
public ClusterFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+ super(clusterName, connectionTimeout, zkClient);
}
@Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
index 4ea5644..0038b21 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
@@ -19,6 +19,7 @@
package org.apache.s4.fixtures;
import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import com.google.inject.Inject;
@@ -28,10 +29,8 @@
@Inject
public ClustersFromZKNoFailFast(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
- super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
+ super(clusterName, connectionTimeout, zkClient);
}
@Override
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
index 1dadc5b..c4f1772 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java
@@ -32,8 +32,6 @@
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.PhysicalCluster;
-import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.RemoteSenders;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
@@ -58,8 +56,6 @@
/* Use Kryo to serialize events. */
install(new FactoryModuleBuilder().implement(SerializerDeserializer.class, KryoSerDeser.class).build(
SerializerDeserializerFactory.class));
- bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
- bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
Cluster mockedCluster = Mockito.mock(Cluster.class);
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index ae19011..7900ecc 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,12 +20,15 @@
import org.apache.s4.comm.DeserializerExecutorFactory;
import org.apache.s4.comm.staging.MemoryAwareDeserializerExecutorFactory;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.core.RemoteSenders;
import org.apache.s4.core.staging.BlockingSenderExecutorServiceFactory;
import org.apache.s4.core.staging.BlockingStreamExecutorServiceFactory;
import org.apache.s4.core.staging.SenderExecutorServiceFactory;
import org.apache.s4.core.staging.StreamExecutorServiceFactory;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.NoOpDeploymentManager;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +60,9 @@
bind(SenderExecutorServiceFactory.class).to(BlockingSenderExecutorServiceFactory.class);
bind(DeserializerExecutorFactory.class).to(MemoryAwareDeserializerExecutorFactory.class);
+ bind(RemoteStreams.class).toInstance(Mockito.mock(RemoteStreams.class));
+ bind(RemoteSenders.class).toInstance(Mockito.mock(RemoteSenders.class));
+
bind(Integer.class).annotatedWith(Names.named("s4.sender.parallelism")).toInstance(8);
bind(Integer.class).annotatedWith(Names.named("s4.sender.workQueueSize")).toInstance(10000);