[S4-110] Formatting code to make the diff look better
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index c9cfcb7..cf2cd40 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -33,7 +33,7 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.name.Names;
-public class HelixBasedCommModule extends AbstractModule{
+public class HelixBasedCommModule extends AbstractModule {
private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
InputStream commConfigInputStream;
@@ -74,7 +74,8 @@
// a node holds a single partition assignment
// ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
- bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(TaskStateModelFactory.class);
+ bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
+ TaskStateModelFactory.class);
bind(Assignment.class).to(AssignmentFromHelix.class);
bind(Cluster.class).to(ClusterFromHelix.class);
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
index 5d79859..4c89aab 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
@@ -24,7 +24,7 @@
/**
* Used for creating RemoteEmitter instances depending on the cluster configuration. Follows the "assisted injection"
* pattern from Guice 3.
- *
+ *
*/
public interface RemoteEmitterFactory {
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
index 9eebd06..ea35bd6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
@@ -10,52 +10,45 @@
import org.apache.helix.participant.statemachine.Transition;
@StateModelInfo(states = { "LEADER,STANDBY" }, initialState = "OFFLINE")
-public class S4StateModel extends StateModel
-{
- private static Logger logger = LoggerFactory.getLogger(S4StateModel.class);
+public class S4StateModel extends StateModel {
+ private static Logger logger = LoggerFactory.getLogger(S4StateModel.class);
- private final String streamName;
- private final String partitionId;
+ private final String streamName;
+ private final String partitionId;
- public S4StateModel(String partitionName)
- {
- String[] parts = partitionName.split("_");
- this.streamName = parts[0];
- this.partitionId = parts[1];
- }
+ public S4StateModel(String partitionName) {
+ String[] parts = partitionName.split("_");
+ this.streamName = parts[0];
+ this.partitionId = parts[1];
+ }
- @Transition(from = "OFFLINE", to = "STANDBY")
- public void becomeLeaderFromOffline(Message msg, NotificationContext context)
- {
- logger.info("Transitioning from " + msg.getFromState() + " to "
- + msg.getToState() + "for " + msg.getPartitionName());
- }
+ @Transition(from = "OFFLINE", to = "STANDBY")
+ public void becomeLeaderFromOffline(Message msg, NotificationContext context) {
+ logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+ + msg.getPartitionName());
+ }
- @Transition(from = "STANDBY", to = "LEADER")
- public void becomeLeaderFromStandby(Message msg, NotificationContext context)
- {
- logger.info("Transitioning from " + msg.getFromState() + " to "
- + msg.getToState() + "for " + msg.getPartitionName());
- }
+ @Transition(from = "STANDBY", to = "LEADER")
+ public void becomeLeaderFromStandby(Message msg, NotificationContext context) {
+ logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+ + msg.getPartitionName());
+ }
- @Transition(from = "LEADER", to = "STANDBY")
- public void becomeStandbyFromLeader(Message msg, NotificationContext context)
- {
- logger.info("Transitioning from " + msg.getFromState() + " to "
- + msg.getToState() + "for " + msg.getPartitionName());
- }
+ @Transition(from = "LEADER", to = "STANDBY")
+ public void becomeStandbyFromLeader(Message msg, NotificationContext context) {
+ logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+ + msg.getPartitionName());
+ }
- @Transition(from = "STANDBY", to = "OFFLINE")
- public void becomeOfflineFromStandby(Message msg, NotificationContext context)
- {
- logger.info("Transitioning from " + msg.getFromState() + " to "
- + msg.getToState() + "for " + msg.getPartitionName());
- }
-
- @Transition(from = "OFFLINE", to = "DROPPED")
- public void dropPartition(Message msg, NotificationContext context)
- {
- logger.info("Dropping partition" + msg.getPartitionName());
- }
+ @Transition(from = "STANDBY", to = "OFFLINE")
+ public void becomeOfflineFromStandby(Message msg, NotificationContext context) {
+ logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+ + msg.getPartitionName());
+ }
+
+ @Transition(from = "OFFLINE", to = "DROPPED")
+ public void dropPartition(Message msg, NotificationContext context) {
+ logger.info("Dropping partition" + msg.getPartitionName());
+ }
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
index eef64ea..3b885a8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
@@ -2,8 +2,7 @@
import org.apache.helix.participant.statemachine.StateModelFactory;
-
-public class TaskStateModelFactory extends StateModelFactory<S4StateModel>{
+public class TaskStateModelFactory extends StateModelFactory<S4StateModel> {
@Override
public S4StateModel createNewStateModel(String partitionName) {
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index a0e5002..120eee3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -65,8 +65,7 @@
*/
public class TCPEmitter implements Emitter, ClusterChangeListener {
- private static final Logger logger = LoggerFactory
- .getLogger(TCPEmitter.class);
+ private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
private final int nettyTimeout;
@@ -87,24 +86,21 @@
private final Lock lock;
@Inject
- SerializerDeserializer serDeser= new KryoSerDeser();
+ SerializerDeserializer serDeser = new KryoSerDeser();
@Inject
- public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
- throws InterruptedException {
+ public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout) throws InterruptedException {
this.nettyTimeout = timeout;
this.topology = topology;
this.lock = new ReentrantLock();
// Initialize data structures
- //int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+ // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
// TODO cluster can grow in size
- nodeChannelMap = Maps.synchronizedBiMap(HashBiMap
- .<InstanceConfig, Channel> create());
+ nodeChannelMap = Maps.synchronizedBiMap(HashBiMap.<InstanceConfig, Channel> create());
// Initialize netty related structures
- ChannelFactory factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(),
+ ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -137,19 +133,16 @@
}
try {
- ChannelFuture connectFuture = this.bootstrap
- .connect(new InetSocketAddress(config.getHostName(),
- Integer.parseInt(config.getPort())));
+ ChannelFuture connectFuture = this.bootstrap.connect(new InetSocketAddress(config.getHostName(), Integer
+ .parseInt(config.getPort())));
connectFuture.await();
if (connectFuture.isSuccess()) {
channels.add(connectFuture.getChannel());
- nodeChannelMap
- .forcePut(config, connectFuture.getChannel());
+ nodeChannelMap.forcePut(config, connectFuture.getChannel());
return true;
}
} catch (InterruptedException ie) {
- logger.error(String.format("Interrupted while connecting to %s:%d",
- config.getHostName(), config.getPort()));
+ logger.error(String.format("Interrupted while connecting to %s:%d", config.getHostName(), config.getPort()));
Thread.currentThread().interrupt();
}
return false;
@@ -158,8 +151,7 @@
private void sendMessage(String streamName, int partitionId, byte[] message) {
ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
buffer.writeBytes(message);
- InstanceConfig config = topology
- .getDestination(streamName, partitionId);
+ InstanceConfig config = topology.getDestination(streamName, partitionId);
if (!nodeChannelMap.containsKey(config)) {
if (!connectTo(config)) {
// Couldn't connect, discard message
@@ -176,8 +168,7 @@
@Override
public boolean send(int partitionId, EventMessage message) {
- sendMessage(message.getStreamName(), partitionId,
- serDeser.serialize(message));
+ sendMessage(message.getStreamName(), partitionId, serDeser.serialize(message));
return true;
}
@@ -188,8 +179,7 @@
}
c.close().addListener(new ChannelFutureListener() {
@Override
- public void operationComplete(ChannelFuture future)
- throws Exception {
+ public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess())
channels.remove(future.getChannel());
else
@@ -208,18 +198,18 @@
}
}
- //@Override
+ // @Override
public int getPartitionCount() {
return topology.getPhysicalCluster().getPartitionCount();
}
+
public int getPartitionCount(String streamName) {
- return topology.getPhysicalCluster().getPartitionCount();
- }
+ return topology.getPhysicalCluster().getPartitionCount();
+ }
class ExceptionHandler extends SimpleChannelUpstreamHandler {
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
Throwable t = e.getCause();
if (t instanceof ClosedChannelException) {
nodeChannelMap.inverse().remove(e.getChannel());
@@ -248,10 +238,8 @@
try {
// TODO handle possible cluster reconfiguration between send
// and failure callback
- logger.warn(
- "Failed to send message to node {} (according to current cluster information)",
- topology.getPhysicalCluster().getNodes()
- .get(partitionId));
+ logger.warn("Failed to send message to node {} (according to current cluster information)",
+ topology.getPhysicalCluster().getNodes().get(partitionId));
} catch (IndexOutOfBoundsException ignored) {
// cluster was changed
}
@@ -262,6 +250,6 @@
@Override
public void onChange() {
-
+
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 952306a..dfe98bf 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -34,100 +34,80 @@
* Used for defining and dimensioning logical clusters in Zookeeper.
*
*/
-public class TaskSetup
-{
+public class TaskSetup {
- private ZkClient zkclient;
- private boolean isHelixEnabled = true;
- private HelixAdmin helixAdmin;
- org.apache.helix.manager.zk.ZkClient helixZkClient;
+ private ZkClient zkclient;
+ private boolean isHelixEnabled = true;
+ private HelixAdmin helixAdmin;
+ org.apache.helix.manager.zk.ZkClient helixZkClient;
- public TaskSetup(String zookeeperAddress)
- {
- if (isHelixEnabled)
- {
- helixAdmin = new ZKHelixAdmin(zookeeperAddress);
- } else
- {
- zkclient = new ZkClient(zookeeperAddress);
- zkclient.setZkSerializer(new ZNRecordSerializer());
- if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS))
- {
- throw new RuntimeException(
- "Could not connect to ZooKeeper after 10 seconds.");
- }
- }
- }
-
- public void clean(String clusterName)
- {
- if (isHelixEnabled)
- {
- helixAdmin.dropCluster(clusterName);
- } else
- {
- zkclient.deleteRecursive("/s4/clusters/" + clusterName);
- }
- }
-
- public void setup(String cluster, int tasks, int initialPort)
- {
- if (isHelixEnabled)
- {
- helixAdmin.addCluster(cluster, false);
- StateModelDefinition onlineofflinemodel = new StateModelDefinition(
- new StateModelConfigGenerator().generateConfigForOnlineOffline());
- StateModelDefinition leaderstandbymodel = new StateModelDefinition(
- new StateModelConfigGenerator().generateConfigForLeaderStandby());
-
- helixAdmin.addStateModelDef(cluster, "OnlineOffline", onlineofflinemodel);
- helixAdmin.addStateModelDef(cluster, "LeaderStandby", leaderstandbymodel);
-
- for (int i = 0; i < tasks; i++)
- {
- InstanceConfig instanceConfig = new InstanceConfig("localhost_"
- + initialPort);
- instanceConfig.setHostName("localhost");
- instanceConfig.setPort("" + initialPort);
- helixAdmin.addInstance(cluster, instanceConfig);
- initialPort = initialPort + 1;
- }
-
- return;
- }
- try
- {
- zkclient.createPersistent("/s4/streams", true);
- } catch (ZkException ignored)
- {
- // ignore if exists
+ public TaskSetup(String zookeeperAddress) {
+ if (isHelixEnabled) {
+ helixAdmin = new ZKHelixAdmin(zookeeperAddress);
+ } else {
+ zkclient = new ZkClient(zookeeperAddress);
+ zkclient.setZkSerializer(new ZNRecordSerializer());
+ if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Could not connect to ZooKeeper after 10 seconds.");
+ }
+ }
}
- zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
- zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
- zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
- for (int i = 0; i < tasks; i++)
- {
- String taskId = "Task-" + i;
- ZNRecord record = new ZNRecord(taskId);
- record.putSimpleField("taskId", taskId);
- record.putSimpleField("port", String.valueOf(initialPort + i));
- record.putSimpleField("partition", String.valueOf(i));
- record.putSimpleField("cluster", cluster);
- zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId,
- record);
+ public void clean(String clusterName) {
+ if (isHelixEnabled) {
+ helixAdmin.dropCluster(clusterName);
+ } else {
+ zkclient.deleteRecursive("/s4/clusters/" + clusterName);
+ }
}
- }
- public void disconnect()
- {
- if (isHelixEnabled)
- {
- helixZkClient.close();
- } else
- {
- zkclient.close();
+ public void setup(String cluster, int tasks, int initialPort) {
+ if (isHelixEnabled) {
+ helixAdmin.addCluster(cluster, false);
+ StateModelDefinition onlineofflinemodel = new StateModelDefinition(
+ new StateModelConfigGenerator().generateConfigForOnlineOffline());
+ StateModelDefinition leaderstandbymodel = new StateModelDefinition(
+ new StateModelConfigGenerator().generateConfigForLeaderStandby());
+
+ helixAdmin.addStateModelDef(cluster, "OnlineOffline", onlineofflinemodel);
+ helixAdmin.addStateModelDef(cluster, "LeaderStandby", leaderstandbymodel);
+
+ for (int i = 0; i < tasks; i++) {
+ InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setPort("" + initialPort);
+ helixAdmin.addInstance(cluster, instanceConfig);
+ initialPort = initialPort + 1;
+ }
+
+ return;
+ }
+ try {
+ zkclient.createPersistent("/s4/streams", true);
+ } catch (ZkException ignored) {
+ // ignore if exists
+ }
+
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
+ for (int i = 0; i < tasks; i++) {
+ String taskId = "Task-" + i;
+ ZNRecord record = new ZNRecord(taskId);
+ record.putSimpleField("taskId", taskId);
+ record.putSimpleField("port", String.valueOf(initialPort + i));
+ record.putSimpleField("partition", String.valueOf(i));
+ record.putSimpleField("cluster", cluster);
+ zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId, record);
+ }
}
- }
+
+ public void disconnect() {
+ if (isHelixEnabled) {
+ helixZkClient.close();
+ } else {
+ zkclient.close();
+ }
+ }
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index 7e0b4ac..243faac 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -35,135 +35,109 @@
import com.google.inject.name.Named;
@Singleton
-public class AssignmentFromHelix implements Assignment
-{
- private static final Logger logger = LoggerFactory
- .getLogger(AssignmentFromHelix.class);
+public class AssignmentFromHelix implements Assignment {
+ private static final Logger logger = LoggerFactory.getLogger(AssignmentFromHelix.class);
- private String clusterName;
- private final String zookeeperAddress;
- private String machineId;
- private HelixManager zkHelixManager;
-
- private HelixDataAccessor helixDataAccessor;
- AtomicReference<ClusterNode> clusterNodeRef;
- private final Lock lock;
- private final AtomicBoolean currentlyOwningTask;
- private final Condition taskAcquired;
+ private String clusterName;
+ private final String zookeeperAddress;
+ private String machineId;
+ private HelixManager zkHelixManager;
- private final StateModelFactory<? extends StateModel> taskStateModelFactory;
- //private final StateModelFactory<? extends StateModel> appStateModelFactory;
+ private HelixDataAccessor helixDataAccessor;
+ AtomicReference<ClusterNode> clusterNodeRef;
+ private final Lock lock;
+ private final AtomicBoolean currentlyOwningTask;
+ private final Condition taskAcquired;
- @Inject
- public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
- @Named("s4.instance.name") String instanceName,
- @Named("s4.cluster.zk_address") String zookeeperAddress
- ) throws Exception
- {
- this.taskStateModelFactory = new TaskStateModelFactory();
-// this.appStateModelFactory = appStateModelFactory;
- this.clusterName = clusterName;
- this.zookeeperAddress = zookeeperAddress;
- machineId = "localhost";
- lock = new ReentrantLock();
- ZkClient zkClient = new ZkClient(zookeeperAddress);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
- BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(
- zkClient);
- helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
- clusterNodeRef = new AtomicReference<ClusterNode>();
- taskAcquired = lock.newCondition();
- currentlyOwningTask = new AtomicBoolean(true);
- try
- {
- machineId = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e)
- {
- logger.warn("Unable to get hostname", e);
- machineId = "UNKNOWN";
- }
- ClusterNode node = new ClusterNode(-1,
- Integer.parseInt(instanceName.split("_")[1]), machineId,
- instanceName);
- clusterNodeRef.set(node);
- currentlyOwningTask.set(true);
- }
+ private final StateModelFactory<? extends StateModel> taskStateModelFactory;
- @Inject
- public void init()
- {
- //joinCluster();
- }
+ // private final StateModelFactory<? extends StateModel> appStateModelFactory;
- @Override
- public ClusterNode assignClusterNode()
- {
- lock.lock();
- try
- {
- while (!currentlyOwningTask.get())
- {
- taskAcquired.awaitUninterruptibly();
- }
- } catch (Exception e)
- {
- logger.error("Exception while waiting to join the cluster");
- return null;
- } finally
- {
- lock.unlock();
- }
- return clusterNodeRef.get();
- }
-
- public void joinClusterOld()
- {
- lock.lock();
- try
- {
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- do
- {
- List<InstanceConfig> instances = helixDataAccessor
- .getChildValues(keyBuilder.instanceConfigs());
- List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
- .liveInstances());
- for (InstanceConfig instanceConfig : instances)
- {
- String instanceName = instanceConfig.getInstanceName();
- if (!liveInstances.contains(instanceName))
- {
- zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
- instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
- zkHelixManager.getStateMachineEngine().registerStateModelFactory(
- "LeaderStandby", taskStateModelFactory);
-
- zkHelixManager.connect();
- ClusterNode node = new ClusterNode(-1,
- Integer.parseInt(instanceConfig.getPort()), machineId,
- instanceName);
- clusterNodeRef.set(node);
- currentlyOwningTask.set(true);
- taskAcquired.signalAll();
- break;
- }
+ @Inject
+ public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
+ @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
+ throws Exception {
+ this.taskStateModelFactory = new TaskStateModelFactory();
+ // this.appStateModelFactory = appStateModelFactory;
+ this.clusterName = clusterName;
+ this.zookeeperAddress = zookeeperAddress;
+ machineId = "localhost";
+ lock = new ReentrantLock();
+ ZkClient zkClient = new ZkClient(zookeeperAddress);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
+ BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+ helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+ clusterNodeRef = new AtomicReference<ClusterNode>();
+ taskAcquired = lock.newCondition();
+ currentlyOwningTask = new AtomicBoolean(true);
+ try {
+ machineId = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ logger.warn("Unable to get hostname", e);
+ machineId = "UNKNOWN";
}
- if (instances.size() == liveInstances.size())
- {
- System.out
- .println("No more nodes can join the cluster. Will wait for some node to die.");
- Thread.sleep(100000);
- }
- } while (!currentlyOwningTask.get());
- System.out.println("Joined the cluster:"+ clusterName +" as "+ clusterNodeRef.get().getTaskId());
- } catch (Exception e)
- {
- e.printStackTrace();
- } finally
- {
- lock.unlock();
+ ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
+ clusterNodeRef.set(node);
+ currentlyOwningTask.set(true);
}
- }
+
+ @Inject
+ public void init() {
+ // joinCluster();
+ }
+
+ @Override
+ public ClusterNode assignClusterNode() {
+ lock.lock();
+ try {
+ while (!currentlyOwningTask.get()) {
+ taskAcquired.awaitUninterruptibly();
+ }
+ } catch (Exception e) {
+ logger.error("Exception while waiting to join the cluster");
+ return null;
+ } finally {
+ lock.unlock();
+ }
+ return clusterNodeRef.get();
+ }
+
+ public void joinClusterOld() {
+ lock.lock();
+ try {
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ do {
+ List<InstanceConfig> instances = helixDataAccessor.getChildValues(keyBuilder.instanceConfigs());
+ List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+ for (InstanceConfig instanceConfig : instances) {
+ String instanceName = instanceConfig.getInstanceName();
+ if (!liveInstances.contains(instanceName)) {
+ zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+ InstanceType.PARTICIPANT, zookeeperAddress);
+ zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
+ taskStateModelFactory);
+
+ zkHelixManager.connect();
+ ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
+ instanceName);
+ clusterNodeRef.set(node);
+ currentlyOwningTask.set(true);
+ taskAcquired.signalAll();
+ break;
+ }
+ }
+ if (instances.size() == liveInstances.size()) {
+ System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
+ Thread.sleep(100000);
+ }
+ } while (!currentlyOwningTask.get());
+ System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ lock.unlock();
+ }
+ }
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 0b81854..c482624 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -44,7 +44,7 @@
/**
* Handles partition assignment through Zookeeper.
- *
+ *
*/
@Singleton
public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 3226fab..19e0628 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -46,172 +46,145 @@
import org.apache.helix.spectator.RoutingTableProvider;
/**
- * Represents a logical cluster definition fetched from Zookeeper. Notifies
- * listeners of runtime changes in the configuration.
+ * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
+ * configuration.
*
*/
-public class ClusterFromHelix extends RoutingTableProvider implements Cluster
-{
+public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
- private static Logger logger = LoggerFactory
- .getLogger(ClusterFromHelix.class);
+ private static Logger logger = LoggerFactory.getLogger(ClusterFromHelix.class);
- private final String clusterName;
- private final AtomicReference<PhysicalCluster> clusterRef;
- private final List<ClusterChangeListener> listeners;
- private final Lock lock;
- private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+ private final String clusterName;
+ private final AtomicReference<PhysicalCluster> clusterRef;
+ private final List<ClusterChangeListener> listeners;
+ private final Lock lock;
+ private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
- /**
- * only the local topology
- */
- @Inject
- public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
- throws Exception
- {
- this.clusterName = clusterName;
- Map<String, Integer> map = Collections.emptyMap();
- partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
- this.clusterRef = new AtomicReference<PhysicalCluster>();
- this.listeners = new ArrayList<ClusterChangeListener>();
- lock = new ReentrantLock();
+ /**
+ * only the local topology
+ */
+ @Inject
+ public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
+ @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ this.clusterName = clusterName;
+ Map<String, Integer> map = Collections.emptyMap();
+ partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+ this.clusterRef = new AtomicReference<PhysicalCluster>();
+ this.listeners = new ArrayList<ClusterChangeListener>();
+ lock = new ReentrantLock();
- }
+ }
- /**
- * any topology
- */
- public ClusterFromHelix(String clusterName, ZkClient zkClient,
- String machineId)
- {
- this.clusterName = clusterName;
- Map<String, Integer> map = Collections.emptyMap();
- partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
- this.clusterRef = new AtomicReference<PhysicalCluster>();
- this.listeners = new ArrayList<ClusterChangeListener>();
- lock = new ReentrantLock();
+ /**
+ * any topology
+ */
+ public ClusterFromHelix(String clusterName, ZkClient zkClient, String machineId) {
+ this.clusterName = clusterName;
+ Map<String, Integer> map = Collections.emptyMap();
+ partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+ this.clusterRef = new AtomicReference<PhysicalCluster>();
+ this.listeners = new ArrayList<ClusterChangeListener>();
+ lock = new ReentrantLock();
- }
+ }
- @Override
- public void onExternalViewChange(List<ExternalView> externalViewList,
- NotificationContext changeContext)
- {
- lock.lock();
- try
- {
- logger.info("Start:Processing change in cluster topology");
- super.onExternalViewChange(externalViewList, changeContext);
- HelixManager manager = changeContext.getManager();
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- List<String> resources = helixDataAccessor.getChildNames(keyBuilder
- .idealStates());
- Map<String,Integer> map = new HashMap<String, Integer>();
- for (String resource : resources)
- {
- String resourceType = configAccessor.get(
- builder.forCluster(clusterName).forResource(resource)
- .build(), "type");
- if("Task".equalsIgnoreCase(resourceType)){
- String streamName = configAccessor.get(
- builder.forCluster(clusterName).forResource(resource)
- .build(), "streamName");
- IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
- map.put(streamName, idealstate.getNumPartitions());
+ @Override
+ public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+ lock.lock();
+ try {
+ logger.info("Start:Processing change in cluster topology");
+ super.onExternalViewChange(externalViewList, changeContext);
+ HelixManager manager = changeContext.getManager();
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ List<String> resources = helixDataAccessor.getChildNames(keyBuilder.idealStates());
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ for (String resource : resources) {
+ String resourceType = configAccessor.get(builder.forCluster(clusterName).forResource(resource).build(),
+ "type");
+ if ("Task".equalsIgnoreCase(resourceType)) {
+ String streamName = configAccessor.get(builder.forCluster(clusterName).forResource(resource)
+ .build(), "streamName");
+ IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+ map.put(streamName, idealstate.getNumPartitions());
+ }
+ }
+ partitionCountMapRef.set(map);
+ for (ClusterChangeListener listener : listeners) {
+ listener.onChange();
+ }
+ logger.info("End:Processing change in cluster topology");
+
+ } catch (Exception e) {
+ logger.error("", e);
+ } finally {
+ lock.unlock();
}
- }
- partitionCountMapRef.set(map);
- for (ClusterChangeListener listener : listeners)
- {
- listener.onChange();
- }
- logger.info("End:Processing change in cluster topology");
-
- } catch (Exception e)
- {
- logger.error("", e);
- } finally
- {
- lock.unlock();
}
- }
- @Override
- public PhysicalCluster getPhysicalCluster()
- {
- return clusterRef.get();
- }
-
- @Override
- public void addListener(ClusterChangeListener listener)
- {
- logger.info("Adding topology change listener:" + listener);
- listeners.add(listener);
- }
-
- @Override
- public void removeListener(ClusterChangeListener listener)
- {
- logger.info("Removing topology change listener:" + listener);
- listeners.remove(listener);
- }
-
- @Override
- public int hashCode()
- {
- final int prime = 31;
- int result = 1;
- result = prime * result
- + ((clusterName == null) ? 0 : clusterName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ClusterFromHelix other = (ClusterFromHelix) obj;
- if (clusterName == null)
- {
- if (other.clusterName != null)
- return false;
- } else if (!clusterName.equals(other.clusterName))
- return false;
- return true;
- }
-
- @Override
- public InstanceConfig getDestination(String streamName, int partitionId)
- {
- List<InstanceConfig> instances = getInstances(streamName, streamName + "_"
- + partitionId, "LEADER");
- if (instances.size() == 1)
- {
- return instances.get(0);
- } else
- {
- return null;
+ @Override
+ public PhysicalCluster getPhysicalCluster() {
+ return clusterRef.get();
}
- }
-
- @Override
- public Integer getPartitionCount(String streamName){
- Integer numPartitions = partitionCountMapRef.get().get(streamName);
- if(numPartitions==null){
- return -1;
+
+ @Override
+ public void addListener(ClusterChangeListener listener) {
+ logger.info("Adding topology change listener:" + listener);
+ listeners.add(listener);
}
- return numPartitions;
- }
+
+ @Override
+ public void removeListener(ClusterChangeListener listener) {
+ logger.info("Removing topology change listener:" + listener);
+ listeners.remove(listener);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ClusterFromHelix other = (ClusterFromHelix) obj;
+ if (clusterName == null) {
+ if (other.clusterName != null)
+ return false;
+ } else if (!clusterName.equals(other.clusterName))
+ return false;
+ return true;
+ }
+
+ @Override
+ public InstanceConfig getDestination(String streamName, int partitionId) {
+ List<InstanceConfig> instances = getInstances(streamName, streamName + "_" + partitionId, "LEADER");
+ if (instances.size() == 1) {
+ return instances.get(0);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public Integer getPartitionCount(String streamName) {
+ Integer numPartitions = partitionCountMapRef.get().get(streamName);
+ if (numPartitions == null) {
+ return -1;
+ }
+ return numPartitions;
+ }
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index f6d4df9..3e4567a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -42,7 +42,7 @@
/**
* Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
* configuration.
- *
+ *
*/
public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
@@ -216,15 +216,13 @@
}
@Override
- public InstanceConfig getDestination(String streamName, int partitionId)
- {
- return null;
+ public InstanceConfig getDestination(String streamName, int partitionId) {
+ return null;
}
@Override
- public Integer getPartitionCount(String streamName)
- {
- return null;
+ public Integer getPartitionCount(String streamName) {
+ return null;
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
index e622107..1e117c8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
@@ -20,7 +20,7 @@
/**
* Represents an node.
- *
+ *
*/
public class ClusterNode {
private int partition;
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
index 217d4e7..4e5351b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
@@ -55,11 +55,8 @@
this.clusterName = clusterName;
this.connectionTimeout = connectionTimeout;
-
}
-
-
public Cluster getCluster(String clusterName) {
return clusters.get(clusterName);
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index ce53c1a..cbf209e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -99,8 +99,9 @@
}
public int getPartitionCount(String stream) {
- return numPartitions;
- }
+ return numPartitions;
+ }
+
/**
* @param node
*/
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index 7720bbf..7b75ff4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -107,7 +107,9 @@
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.topology.RemoteStreamsManager#getConsumers(java.lang.String)
*/
@Override
@@ -186,8 +188,11 @@
streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
}
- /* (non-Javadoc)
- * @see org.apache.s4.comm.topology.RemoteStreamsManager#addOutputStream(java.lang.String, java.lang.String, java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.comm.topology.RemoteStreamsManager#addOutputStream(java.lang.String, java.lang.String,
+ * java.lang.String)
*/
@Override
public void addOutputStream(String appId, String clusterName, String streamName) {
@@ -219,7 +224,9 @@
zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.topology.RemoteStreamsManager#addInputStream(int, java.lang.String, java.lang.String)
*/
@Override
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
index 94c95b6..3eb6d6a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
@@ -2,22 +2,19 @@
import java.util.Set;
-public interface RemoteStreamsManager
-{
+public interface RemoteStreamsManager {
- public abstract Set<StreamConsumer> getConsumers(String streamName);
+ public abstract Set<StreamConsumer> getConsumers(String streamName);
- public abstract void addOutputStream(String appId, String clusterName,
- String streamName);
+ public abstract void addOutputStream(String appId, String clusterName, String streamName);
- /**
- * Publishes interest in a stream from an application.
- *
- * @param appId
- * @param clusterName
- * @param streamName
- */
- public abstract void addInputStream(int appId, String clusterName,
- String streamName);
+ /**
+ * Publishes interest in a stream from an application.
+ *
+ * @param appId
+ * @param clusterName
+ * @param streamName
+ */
+ public abstract void addInputStream(int appId, String clusterName, String streamName);
-}
\ No newline at end of file
+}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
index 957318d..845e649 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
@@ -2,26 +2,21 @@
import java.util.Set;
-public class RemoteStreamsManagerImpl implements RemoteStreamsManager
-{
+public class RemoteStreamsManagerImpl implements RemoteStreamsManager {
- @Override
- public Set<StreamConsumer> getConsumers(String streamName)
- {
- return null;
- }
+ @Override
+ public Set<StreamConsumer> getConsumers(String streamName) {
+ return null;
+ }
- @Override
- public void addOutputStream(String appId, String clusterName,
- String streamName)
- {
-
- }
+ @Override
+ public void addOutputStream(String appId, String clusterName, String streamName) {
- @Override
- public void addInputStream(int appId, String clusterName, String streamName)
- {
-
- }
+ }
+
+ @Override
+ public void addInputStream(int appId, String clusterName, String streamName) {
+
+ }
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
index b55f469..9858aef 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
@@ -24,12 +24,12 @@
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
/**
*
* Utility to serialize/deserialize data in ZK. <br/>
- * Using Json format and Gson library.
- * TODO: Explore other libraries like jackson much richer features.
- * Gson needs no-arg constructor to work with without additional work
+ * Using Json format and Gson library. TODO: Explore other libraries like jackson much richer features. Gson needs
+ * no-arg constructor to work with without additional work
*/
public class ZNRecordSerializer implements ZkSerializer {
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
index e5a8689..cab6c0f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
@@ -25,72 +25,70 @@
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;
+
/**
*
* Overwriting the ZKclient since the org.I0Itec.zkclient.ZkClient does not expose some important methods
*/
public class ZkClient extends org.I0Itec.zkclient.ZkClient {
- public ZkClient(IZkConnection connection, int connectionTimeout,
- ZkSerializer zkSerializer) {
- super(connection, connectionTimeout, zkSerializer);
- }
+ public ZkClient(IZkConnection connection, int connectionTimeout, ZkSerializer zkSerializer) {
+ super(connection, connectionTimeout, zkSerializer);
+ }
- public ZkClient(IZkConnection connection, int connectionTimeout) {
- super(connection, connectionTimeout);
- }
+ public ZkClient(IZkConnection connection, int connectionTimeout) {
+ super(connection, connectionTimeout);
+ }
- public ZkClient(IZkConnection connection) {
- super(connection);
- }
+ public ZkClient(IZkConnection connection) {
+ super(connection);
+ }
- public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
- super(zkServers, sessionTimeout, connectionTimeout);
- }
+ public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
+ super(zkServers, sessionTimeout, connectionTimeout);
+ }
- public ZkClient(String zkServers, int connectionTimeout) {
- super(zkServers, connectionTimeout);
- }
+ public ZkClient(String zkServers, int connectionTimeout) {
+ super(zkServers, connectionTimeout);
+ }
- public ZkClient(String serverstring) {
- super(serverstring);
- }
+ public ZkClient(String serverstring) {
+ super(serverstring);
+ }
- public IZkConnection getConnection() {
- return _connection;
- }
-
- public long getSessionId(){
- return ((ZkConnection)_connection).getZookeeper().getSessionId();
- }
+ public IZkConnection getConnection() {
+ return _connection;
+ }
- public Stat getStat(final String path) {
- Stat stat = retryUntilConnected(new Callable<Stat>() {
+ public long getSessionId() {
+ return ((ZkConnection) _connection).getZookeeper().getSessionId();
+ }
- @Override
- public Stat call() throws Exception {
- Stat stat = ((ZkConnection) _connection).getZookeeper().exists(
- path, false);
- return stat;
- }
- });
+ public Stat getStat(final String path) {
+ Stat stat = retryUntilConnected(new Callable<Stat>() {
- return stat;
- }
+ @Override
+ public Stat call() throws Exception {
+ Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
+ return stat;
+ }
+ });
- @SuppressWarnings("unchecked")
- @Override
- public <T extends Object> T readData(String path,
- boolean returnNullIfPathNotExists) {
- T data = null;
- try {
- data = (T) readData(path, null);
- } catch (ZkNoNodeException e) {
- if (!returnNullIfPathNotExists) {
- throw e;
- }
- }
- return data;
- }
+ return stat;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) {
+ T data = null;
+ try {
+ data = (T) readData(path, null);
+ } catch (ZkNoNodeException e) {
+ if (!returnNullIfPathNotExists) {
+ throw e;
+ }
+ }
+ return data;
+ }
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index fb18dd6..a326535 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -39,7 +39,7 @@
/**
* UDP based emitter.
- *
+ *
*/
public class UDPEmitter implements Emitter, ClusterChangeListener {
private DatagramSocket socket;
@@ -103,11 +103,11 @@
return true;
}
- // @Override
+ // @Override
public int getPartitionCount() {
return topology.getPhysicalCluster().getPartitionCount();
}
-
+
@Override
public void onChange() {
refreshCluster();
@@ -131,8 +131,7 @@
}
@Override
- public int getPartitionCount(String stream)
- {
- return topology.getPhysicalCluster().getPartitionCount(stream);
+ public int getPartitionCount(String stream) {
+ return topology.getPhysicalCluster().getPartitionCount(stream);
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
index 050a6ec..da015d7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
@@ -25,7 +25,7 @@
/**
* UDP-based emitter for sending events to remote clusters.
- *
+ *
*/
public class UDPRemoteEmitter extends UDPEmitter {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 97ec9af..1c07870 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -83,14 +83,14 @@
/* Use Kryo to serialize events. */
bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(AppStateModelFactory.class);
+ bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(
+ AppStateModelFactory.class);
bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class);
-
- bind(RemoteSendersManager.class).to(RemoteSendersManagerImpl.class);
-
- bind(RemoteStreamsManager.class).to(RemoteStreamsManagerImpl.class);
+ bind(RemoteSendersManager.class).to(RemoteSendersManagerImpl.class);
+
+ bind(RemoteStreamsManager.class).to(RemoteStreamsManagerImpl.class);
bind(S4RLoaderFactory.class);
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 810b8f4..9ae356b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -155,11 +155,12 @@
}
combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
}
-
+
injector = Guice.createInjector(combinedModule);
- //start a HelixController to manage the cluster
+ // start a HelixController to manage the cluster
String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
- HelixControllerMain.startHelixController(mainArgs.zkConnectionString, mainArgs.clusterName, controllerName, HelixControllerMain.STANDALONE);
+ HelixControllerMain.startHelixController(mainArgs.zkConnectionString, mainArgs.clusterName, controllerName,
+ HelixControllerMain.STANDALONE);
if (mainArgs.appClass != null) {
logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
@@ -190,7 +191,7 @@
@Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
String clusterName = null;
-
+
@Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
String instanceName = null;
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 6bc52e3..c8da41a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -24,7 +24,7 @@
/**
* Sends events to a remote cluster.
- *
+ *
*/
public class RemoteSender {
@@ -41,7 +41,8 @@
public void send(String hashKey, EventMessage eventMessage) {
if (hashKey == null) {
// round robin by default
- emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())), eventMessage);
+ emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())),
+ eventMessage);
} else {
int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(eventMessage.getStreamName()));
emitter.send(partition, eventMessage);
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index a43aa7d..e8ed8e0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -38,7 +38,7 @@
/**
* Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
* the event.
- *
+ *
*/
public class RemoteSenders implements RemoteSendersManager {
@@ -61,7 +61,9 @@
Map<String, RemoteSender> sendersByTopology = new HashMap<String, RemoteSender>();
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.core.RemoteSendersManager#send(java.lang.String, org.apache.s4.base.Event)
*/
@Override
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
index 3e0e462..0f1f9fa 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
@@ -2,9 +2,8 @@
import org.apache.s4.base.Event;
-public interface RemoteSendersManager
-{
+public interface RemoteSendersManager {
- public abstract void send(String hashKey, Event event);
+ public abstract void send(String hashKey, Event event);
-}
\ No newline at end of file
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
index 13f8ac8..ed2ce4a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
@@ -2,13 +2,11 @@
import org.apache.s4.base.Event;
-public class RemoteSendersManagerImpl implements RemoteSendersManager
-{
+public class RemoteSendersManagerImpl implements RemoteSendersManager {
- @Override
- public void send(String hashKey, Event event)
- {
-
- }
+ @Override
+ public void send(String hashKey, Event event) {
+
+ }
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index 38f71a8..d47ee0b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -23,90 +23,69 @@
import com.google.common.io.Files;
@StateModelInfo(states = { "ONLINE,OFFLINE" }, initialState = "OFFLINE")
-public class AppStateModel extends StateModel
-{
- private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
- private final String appName;
- private final Server server;
+public class AppStateModel extends StateModel {
+ private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
+ private final String appName;
+ private final Server server;
- public AppStateModel(Server server, String appName)
- {
- this.server = server;
- this.appName = appName;
- }
-
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void deploy(Message message, NotificationContext context) throws Exception
- {
- HelixManager manager = context.getManager();
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
- String uriString = configAccessor.get(scope,
- DistributedDeploymentManager.S4R_URI);
- String clusterName = manager.getClusterName();
- try
- {
- URI uri = new URI(uriString);
- // fetch application
- File localS4RFileCopy;
- try
- {
- localS4RFileCopy = File.createTempFile("tmp", "s4r");
- } catch (IOException e1)
- {
- logger
- .error(
- "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
- appName, e1.getClass().getName() + "->" + e1.getMessage());
- throw new DeploymentFailedException("Cannot deploy application ["
- + appName + "]", e1);
- }
- localS4RFileCopy.deleteOnExit();
- try
- {
- if (ByteStreams.copy(DistributedDeploymentManager.fetchS4App(uri),
- Files.newOutputStreamSupplier(localS4RFileCopy)) == 0)
- {
- throw new DeploymentFailedException("Cannot copy archive from ["
- + uri.toString() + "] to [" + localS4RFileCopy.getAbsolutePath()
- + "] (nothing was copied)");
- }
- } catch (IOException e)
- {
- throw new DeploymentFailedException("Cannot deploy application ["
- + appName + "] from URI [" + uri.toString() + "] ", e);
- }
- // install locally
- App loaded = server.loadApp(localS4RFileCopy, appName);
- if (loaded != null)
- {
- logger.info("Successfully installed application {}", appName);
- // TODO sync with other nodes? (e.g. wait for other apps deployed before
- // starting?
- server.startApp(loaded, appName, clusterName);
- } else
- {
- throw new DeploymentFailedException("Cannot deploy application ["
- + appName + "] from URI [" + uri.toString()
- + "] : cannot start application");
- }
-
- } catch (URISyntaxException e)
- {
- logger
- .error(
- "Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ",
- new String[] { appName, uriString, e.getMessage() });
- throw new DeploymentFailedException("Cannot deploy application ["
- + appName + "]", e);
+ public AppStateModel(Server server, String appName) {
+ this.server = server;
+ this.appName = appName;
}
- }
- @Transition(from = "OFFLINE", to = "ONLINE")
- public void undeploy(Message message, NotificationContext context) throws Exception
- {
- //todo
- }
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void deploy(Message message, NotificationContext context) throws Exception {
+ HelixManager manager = context.getManager();
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
+ String uriString = configAccessor.get(scope, DistributedDeploymentManager.S4R_URI);
+ String clusterName = manager.getClusterName();
+ try {
+ URI uri = new URI(uriString);
+ // fetch application
+ File localS4RFileCopy;
+ try {
+ localS4RFileCopy = File.createTempFile("tmp", "s4r");
+ } catch (IOException e1) {
+ logger.error(
+ "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
+ appName, e1.getClass().getName() + "->" + e1.getMessage());
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e1);
+ }
+ localS4RFileCopy.deleteOnExit();
+ try {
+ if (ByteStreams.copy(DistributedDeploymentManager.fetchS4App(uri),
+ Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
+ throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
+ + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
+ }
+ } catch (IOException e) {
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+ + uri.toString() + "] ", e);
+ }
+ // install locally
+ App loaded = server.loadApp(localS4RFileCopy, appName);
+ if (loaded != null) {
+ logger.info("Successfully installed application {}", appName);
+ // TODO sync with other nodes? (e.g. wait for other apps deployed before
+ // starting?
+ server.startApp(loaded, appName, clusterName);
+ } else {
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+ + uri.toString() + "] : cannot start application");
+ }
+
+ } catch (URISyntaxException e) {
+ logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
+ appName, uriString, e.getMessage() });
+ throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e);
+ }
+ }
+
+ @Transition(from = "OFFLINE", to = "ONLINE")
+ public void undeploy(Message message, NotificationContext context) throws Exception {
+ // todo
+ }
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
index 3af4531..ef6ae58 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
@@ -5,20 +5,20 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
+
@Singleton
-public class AppStateModelFactory extends StateModelFactory<AppStateModel>
-{
- private final Server server;
-
- @Inject
- public AppStateModelFactory(Server server){
- this.server = server;
-
- }
- @Override
- public AppStateModel createNewStateModel(String partitionName)
- {
- return new AppStateModel(server,partitionName);
- }
+public class AppStateModelFactory extends StateModelFactory<AppStateModel> {
+ private final Server server;
+
+ @Inject
+ public AppStateModelFactory(Server server) {
+ this.server = server;
+
+ }
+
+ @Override
+ public AppStateModel createNewStateModel(String partitionName) {
+ return new AppStateModel(server, partitionName);
+ }
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
index ed32085..5dcfa76 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -5,39 +5,24 @@
import com.google.inject.Inject;
import com.google.inject.name.Named;
-public class HelixBasedDeploymentManager implements DeploymentManager
-{
- private final Server server;
- boolean deployed = false;
- private final String clusterName;
+public class HelixBasedDeploymentManager implements DeploymentManager {
+ private final Server server;
+ boolean deployed = false;
+ private final String clusterName;
- @Inject
- public HelixBasedDeploymentManager(
- @Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
- Server server)
- {
- this.clusterName = clusterName;
- this.server = server;
+ @Inject
+ public HelixBasedDeploymentManager(@Named("s4.cluster.name") String clusterName,
+ @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server) {
+ this.clusterName = clusterName;
+ this.server = server;
- }
+ }
- @Override
- public void start()
- {
- /* File s4r = new File(
- "/Users/kgopalak/Documents/projects/s4/incubator-s4-helix/myApp/build/libs/myApp.s4r");
- String appName = "myApp";
- try
- {
- App loaded = server.loadApp(s4r, "myApp");
- server.startApp(loaded, appName, clusterName);
- } catch (Exception e)
- {
- e.printStackTrace();
- }*/
- }
+ @Override
+ public void start() {
+
+ }
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
index 5bab316..6dfc82a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
@@ -38,7 +38,7 @@
Tools.parseArgs(clusterArgs, args);
try {
- logger.info("Adding new nodes [{}] to cluster [{}] node(s)", clusterArgs.nbNodes, clusterArgs.clusterName);
+ logger.info("Adding new nodes [{}] to cluster [{}] node(s)", clusterArgs.nbNodes, clusterArgs.clusterName);
HelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
int initialPort = clusterArgs.firstListeningPort;
if (clusterArgs.nbNodes > 0) {
@@ -74,14 +74,14 @@
@Parameter(names = "-nodes", description = "Host names of the nodes", required = false)
String nodes = "";
-
+
@Parameter(names = "-zk", description = "Zookeeper connection string")
String zkConnectionString = "localhost:2181";
@Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
int firstListeningPort = -1;
-
- @Parameter(names = {"-ng","-nodeGroup"}, description = "Assign the nodes to one or more groups. This will be useful when you create task", required=false)
+
+ @Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one or more groups. This will be useful when you create task", required = false)
String nodeGroup = "default";
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
index 38fbbe8..190c8c0 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
@@ -88,10 +88,10 @@
@Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
int firstListeningPort = -1;
-
+
@Parameter(names = { "-ng", "-nodeGroup" }, description = "Name of the App", required = false, arity = 1)
String nodeGroup = "default";
-
+
}
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
index ff1cb88..83a7088 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
@@ -47,14 +47,14 @@
for (String instanceName : instancesInCluster) {
InstanceConfig instanceConfig = admin.getInstanceConfig(deployArgs.clusterName, instanceName);
String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
- if(nodeGroup.equals(deployArgs.nodeGroup)){
+ if (nodeGroup.equals(deployArgs.nodeGroup)) {
instancesInGroup.add(instanceName);
}
}
- for(String instanceName:instancesInGroup){
+ for (String instanceName : instancesInGroup) {
is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
}
-
+
admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
index b54187f..ce41ecd 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
@@ -19,64 +19,53 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-public class GenericEventAdapter
-{
+public class GenericEventAdapter {
- public static void main(String[] args)
- {
- AdapterArgs adapterArgs = new AdapterArgs();
+ public static void main(String[] args) {
+ AdapterArgs adapterArgs = new AdapterArgs();
- Tools.parseArgs(adapterArgs, args);
- try
- {
- String instanceName = "adapter";
- HelixManager manager = HelixManagerFactory.getZKHelixManager(
- adapterArgs.clusterName, instanceName, InstanceType.SPECTATOR,
- adapterArgs.zkConnectionString);
- ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName,
- adapterArgs.zkConnectionString, 30, 60);
- manager.connect();
- manager.addExternalViewChangeListener(cluster);
+ Tools.parseArgs(adapterArgs, args);
+ try {
+ String instanceName = "adapter";
+ HelixManager manager = HelixManagerFactory.getZKHelixManager(adapterArgs.clusterName, instanceName,
+ InstanceType.SPECTATOR, adapterArgs.zkConnectionString);
+ ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName, adapterArgs.zkConnectionString,
+ 30, 60);
+ manager.connect();
+ manager.addExternalViewChangeListener(cluster);
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- IdealState idealstate = helixDataAccessor.getProperty(keyBuilder
- .idealStates(adapterArgs.streamName));
- TCPEmitter emitter = new TCPEmitter(cluster, 1000);
- while (true)
- {
- int partitionId = ((int) (Math.random() * 1000))
- % idealstate.getNumPartitions();
- Event event = new Event();
- event.put("name", String.class,
- "Hello world to partition:" + partitionId);
- KryoSerDeser serializer = new KryoSerDeser();
- EventMessage message = new EventMessage("-1", adapterArgs.streamName,
- serializer.serialize(event));
- System.out.println("Sending event to partition:"+partitionId);
- emitter.send(partitionId, message);
- Thread.sleep(1000);
- }
- } catch (Exception e)
- {
- e.printStackTrace();
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(adapterArgs.streamName));
+ TCPEmitter emitter = new TCPEmitter(cluster, 1000);
+ while (true) {
+ int partitionId = ((int) (Math.random() * 1000)) % idealstate.getNumPartitions();
+ Event event = new Event();
+ event.put("name", String.class, "Hello world to partition:" + partitionId);
+ KryoSerDeser serializer = new KryoSerDeser();
+ EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
+ System.out.println("Sending event to partition:" + partitionId);
+ emitter.send(partitionId, message);
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
}
- }
+ @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+ static class AdapterArgs extends S4ArgsBase {
- @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
- static class AdapterArgs extends S4ArgsBase
- {
+ @Parameter(names = "-zk", description = "ZooKeeper connection string")
+ String zkConnectionString = "localhost:2181";
- @Parameter(names = "-zk", description = "ZooKeeper connection string")
- String zkConnectionString = "localhost:2181";
+ @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+ String clusterName;
- @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
- String clusterName;
+ @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
+ String streamName;
- @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
- String streamName;
-
- }
+ }
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
index 1b8214f..13b88ca 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
@@ -17,46 +17,43 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
-public class RebalanceTask extends S4ArgsBase
-{
- public static void main(String[] args)
- {
- RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
+public class RebalanceTask extends S4ArgsBase {
+ public static void main(String[] args) {
+ RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
- Tools.parseArgs(taskArgs, args);
+ Tools.parseArgs(taskArgs, args);
- HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
- // This does the assignment of partition to nodes. It uses a modified
- // version of consistent hashing to distribute active partitions and standbys
- // equally among nodes.
- IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
- List<String> instancesInGroup = new ArrayList<String>();
- List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
- for (String instanceName : instancesInCluster) {
- InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
- String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
- if (nodeGroup.equals(taskArgs.nodeGroup)) {
- instancesInGroup.add(instanceName);
+ HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+ // This does the assignment of partition to nodes. It uses a modified
+ // version of consistent hashing to distribute active partitions and standbys
+ // equally among nodes.
+ IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
+ List<String> instancesInGroup = new ArrayList<String>();
+ List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+ for (String instanceName : instancesInCluster) {
+ InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+ String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+ if (nodeGroup.equals(taskArgs.nodeGroup)) {
+ instancesInGroup.add(instanceName);
+ }
}
+ admin.rebalance(taskArgs.clusterName, currentAssignment, instancesInGroup);
}
- admin.rebalance(taskArgs.clusterName,currentAssignment, instancesInGroup);
- }
- @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
- static class RebalanceTaskArgs extends S4ArgsBase
- {
+ @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+ static class RebalanceTaskArgs extends S4ArgsBase {
- @Parameter(names = "-zk", description = "ZooKeeper connection string")
- String zkConnectionString = "localhost:2181";
+ @Parameter(names = "-zk", description = "ZooKeeper connection string")
+ String zkConnectionString = "localhost:2181";
- @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
- String clusterName;
+ @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+ String clusterName;
- @Parameter(names={"-id","-taskId"},description = "id of the task that produces/consumes a stream", required = true, arity = 1)
- String taskId;
+ @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+ String taskId;
- @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
- String nodeGroup = "default";
+ @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
+ String nodeGroup = "default";
- }
+ }
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
index bd4224e..2be58c1 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
@@ -54,521 +54,413 @@
import com.beust.jcommander.Parameters;
import com.google.common.collect.Maps;
-public class S4Status extends S4ArgsBase
-{
- static Logger logger = LoggerFactory.getLogger(S4Status.class);
+public class S4Status extends S4ArgsBase {
+ static Logger logger = LoggerFactory.getLogger(S4Status.class);
- private static String NONE = "--";
+ private static String NONE = "--";
- public static void main(String[] args)
- {
+ public static void main(String[] args) {
- StatusArgs statusArgs = new StatusArgs();
- Tools.parseArgs(statusArgs, args);
+ StatusArgs statusArgs = new StatusArgs();
+ Tools.parseArgs(statusArgs, args);
- try
- {
- if (statusArgs.clusters.size() > 0)
- {
- for (String cluster : statusArgs.clusters)
- {
- HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster,
- "ADMIN", InstanceType.ADMINISTRATOR,
- statusArgs.zkConnectionString);
- manager.connect();
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- printClusterInfo(manager, cluster);
- List<String> resourcesInCluster = manager.getClusterManagmentTool()
- .getResourcesInCluster(cluster);
+ try {
+ if (statusArgs.clusters.size() > 0) {
+ for (String cluster : statusArgs.clusters) {
+ HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster, "ADMIN",
+ InstanceType.ADMINISTRATOR, statusArgs.zkConnectionString);
+ manager.connect();
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ printClusterInfo(manager, cluster);
+ List<String> resourcesInCluster = manager.getClusterManagmentTool().getResourcesInCluster(cluster);
- List<String> apps = new ArrayList<String>();
- List<String> tasks = new ArrayList<String>();
+ List<String> apps = new ArrayList<String>();
+ List<String> tasks = new ArrayList<String>();
- for (String resource : resourcesInCluster)
- {
- ConfigScope scope = builder.forCluster(cluster)
- .forResource(resource).build();
- String resourceType = configAccessor.get(scope, "type");
- if ("App".equals(resourceType))
- {
- apps.add(resource);
- } else if ("Task".equals(resourceType))
- {
- tasks.add(resource);
+ for (String resource : resourcesInCluster) {
+ ConfigScope scope = builder.forCluster(cluster).forResource(resource).build();
+ String resourceType = configAccessor.get(scope, "type");
+ if ("App".equals(resourceType)) {
+ apps.add(resource);
+ } else if ("Task".equals(resourceType)) {
+ tasks.add(resource);
+ }
+ }
+ if (statusArgs.apps == null && statusArgs.streams == null) {
+ statusArgs.apps = apps;
+ statusArgs.streams = tasks;
+ }
+ for (String app : statusArgs.apps) {
+ if (resourcesInCluster.contains(app)) {
+ printAppInfo(manager, cluster, app);
+ }
+ }
+
+ if (statusArgs.streams != null && statusArgs.streams.size() > 0) {
+ for (String stream : statusArgs.streams) {
+ if (resourcesInCluster.contains(stream)) {
+ printStreamInfo(manager, cluster, stream);
+ }
+ }
+ }
+ manager.disconnect();
+ }
}
- }
- if (statusArgs.apps == null && statusArgs.streams == null)
- {
- statusArgs.apps = apps;
- statusArgs.streams = tasks;
- }
- for (String app : statusArgs.apps)
- {
- if (resourcesInCluster.contains(app))
- {
- printAppInfo(manager, cluster, app);
+ } catch (Exception e) {
+ logger.error("Cannot get the status of S4", e);
+ }
+
+ }
+
+ private static void printStreamInfo(HelixManager manager, String cluster, String taskId) {
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
+ String streamName = configAccessor.get(scope, "streamName");
+ String taskType = configAccessor.get(scope, "taskType");
+
+ System.out.println("Task Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster", 20),
+ inMiddle("Description", 90));
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20), inMiddle(cluster, 20),
+ inMiddle(streamName + " " + taskType, 90));
+ System.out.println(generateEdge(130));
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
+ ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
+ List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+ System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
+ System.out.println(generateEdge(130));
+ for (String partition : assignment.getPartitionSet()) {
+ Map<String, String> stateMap = view.getStateMap(partition);
+ StringBuilder sb = new StringBuilder();
+ String delim = "";
+ for (String instance : stateMap.keySet()) {
+ sb.append(delim);
+ String state = stateMap.get(instance);
+ if (liveInstances.contains(instance)) {
+ sb.append(instance).append(":").append(state);
+ } else {
+ sb.append(instance).append(":").append("OFFLINE");
+ }
+ delim = ", ";
}
- }
+ System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(), 100));
+ }
+ System.out.println(generateEdge(130));
+ }
- if (statusArgs.streams != null && statusArgs.streams.size() > 0)
- {
- for (String stream : statusArgs.streams)
- {
- if (resourcesInCluster.contains(stream))
- {
- printStreamInfo(manager, cluster, stream);
- }
+ private static void printAppInfo(HelixManager manager, String cluster, String app) {
+ ConfigAccessor configAccessor = manager.getConfigAccessor();
+ ConfigScopeBuilder builder = new ConfigScopeBuilder();
+ ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
+ String uri = configAccessor.get(scope, "s4r_uri");
+
+ System.out.println("App Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20), inMiddle(uri, 90));
+ System.out.println(generateEdge(130));
+ HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = helixDataAccessor.keyBuilder();
+ IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
+ ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
+ List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+ Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
+ Map<String, String> appStateMap = view.getStateMap(app);
+ System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
+ System.out.println(generateEdge(130));
+ for (String instance : assignmentMap.keySet()) {
+ String state = appStateMap.get(instance);
+ System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
+ inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
+ }
+
+ System.out.println(generateEdge(130));
+
+ }
+
+ private static void printClusterInfo(HelixManager manager, String cluster) {
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ Builder keyBuilder = dataAccessor.keyBuilder();
+ List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
+ List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
+ if (liveInstances == null) {
+ liveInstances = Collections.emptyList();
+ }
+ System.out.println("Cluster Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
+ System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes", 20),
+ inMiddle("Active", 10), generateEdge(80));
+ System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host", 50),
+ inMiddle("Port", 8), inMiddle("Active", 10));
+ System.out.println(generateEdge(130));
+
+ System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(), 8),
+ inMiddle("" + liveInstances.size(), 8));
+ boolean first = true;
+
+ for (String instance : instances) {
+ InstanceConfig config = dataAccessor.getProperty(keyBuilder.instanceConfig(instance));
+ // System.out.println(config);
+ if (first) {
+ first = false;
+ } else {
+ System.out.format("%n%-50s", " ");
}
- }
- manager.disconnect();
+ System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
+ inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
+ inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
}
- }
- } catch (Exception e)
- {
- logger.error("Cannot get the status of S4", e);
+
+ System.out.println();
}
- }
+ @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
+ static class StatusArgs extends S4ArgsBase {
- private static void printStreamInfo(HelixManager manager, String cluster,
- String taskId)
- {
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
- String streamName = configAccessor.get(scope, "streamName");
- String taskType = configAccessor.get(scope, "taskType");
+ @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
+ List<String> apps;
- System.out.println("Task Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20),
- inMiddle("Cluster", 20), inMiddle("Description", 90));
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20),
- inMiddle(cluster, 20), inMiddle(streamName + " " + taskType, 90));
- System.out.println(generateEdge(130));
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- IdealState assignment = helixDataAccessor.getProperty(keyBuilder
- .idealStates(taskId));
- ExternalView view = helixDataAccessor.getProperty(keyBuilder
- .externalView(taskId));
- List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
- .liveInstances());
- System.out.format("%-50s%-100s%n", inMiddle("Partition", 50),
- inMiddle("State", 20));
- System.out.println(generateEdge(130));
- for (String partition : assignment.getPartitionSet())
- {
- Map<String, String> stateMap = view.getStateMap(partition);
- StringBuilder sb = new StringBuilder();
- String delim="";
- for (String instance : stateMap.keySet())
- {
- sb.append(delim);
- String state = stateMap.get(instance);
- if (liveInstances.contains(instance))
- {
- sb.append(instance).append(":").append(state);
- } else
- {
- sb.append(instance).append(":").append("OFFLINE");
+ @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
+ List<String> clusters;
+
+ @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
+ List<String> streams;
+
+ @Parameter(names = "-zk", description = "ZooKeeper connection string")
+ String zkConnectionString = "localhost:2181";
+
+ @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
+ int timeout = 10000;
+ }
+
+ private static void showAppsStatus(List<Cluster> clusters) {
+ System.out.println("App Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+ System.out.println(generateEdge(130));
+ for (Cluster cluster : clusters) {
+ if (!NONE.equals(cluster.app.name)) {
+ System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
+ inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+ }
}
- delim=", ";
- }
- System.out.format("%-50s%-10s%n", inMiddle(partition, 50),
- inMiddle(sb.toString(), 100));
- }
- System.out.println(generateEdge(130));
- }
+ System.out.println(generateEdge(130));
- private static void printAppInfo(HelixManager manager, String cluster,
- String app)
- {
- ConfigAccessor configAccessor = manager.getConfigAccessor();
- ConfigScopeBuilder builder = new ConfigScopeBuilder();
- ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
- String uri = configAccessor.get(scope, "s4r_uri");
-
- System.out.println("App Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
- inMiddle("Cluster", 20), inMiddle("URI", 90));
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20),
- inMiddle(cluster, 20), inMiddle(uri, 90));
- System.out.println(generateEdge(130));
- HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = helixDataAccessor.keyBuilder();
- IdealState assignment = helixDataAccessor.getProperty(keyBuilder
- .idealStates(app));
- ExternalView view = helixDataAccessor.getProperty(keyBuilder
- .externalView(app));
- List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
- .liveInstances());
- Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
- Map<String, String> appStateMap = view.getStateMap(app);
- System.out.format("%-50s%-20s%n", inMiddle("Node id", 50),
- inMiddle("DEPLOYED", 20));
- System.out.println(generateEdge(130));
- for (String instance : assignmentMap.keySet())
- {
- String state = appStateMap.get(instance);
- System.out
- .format(
- "%-50s%-10s%n",
- inMiddle(instance, 50),
- inMiddle((("ONLINE".equals(state) && liveInstances
- .contains(instance)) ? "Y" : "N"), 20));
}
- System.out.println(generateEdge(130));
+ private static void showClustersStatus(List<Cluster> clusters) {
+ System.out.println("Cluster Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
+ System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
+ generateEdge(80));
+ System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
+ inMiddle("Host", 50), inMiddle("Port", 8));
+ System.out.println(generateEdge(130));
- }
-
- private static void printClusterInfo(HelixManager manager, String cluster)
- {
- HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
- Builder keyBuilder = dataAccessor.keyBuilder();
- List<String> instances = dataAccessor.getChildNames(keyBuilder
- .instanceConfigs());
- List<String> liveInstances = dataAccessor.getChildNames(keyBuilder
- .liveInstances());
- if (liveInstances == null)
- {
- liveInstances = Collections.emptyList();
- }
- System.out.println("Cluster Status");
- System.out.println(generateEdge(130));
- System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
- System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20),
- inMiddle("Nodes", 20), inMiddle("Active", 10), generateEdge(80));
- System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ",
- inMiddle("Node id", 10), inMiddle("Host", 50), inMiddle("Port", 8),
- inMiddle("Active", 10));
- System.out.println(generateEdge(130));
-
- System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20),
- inMiddle("" + instances.size(), 8),
- inMiddle("" + liveInstances.size(), 8));
- boolean first = true;
-
- for (String instance : instances)
- {
- InstanceConfig config = dataAccessor.getProperty(keyBuilder
- .instanceConfig(instance));
- // System.out.println(config);
- if (first)
- {
- first = false;
- } else
- {
- System.out.format("%n%-50s", " ");
- }
- System.out
- .format(
- "%-10s%-46s%-10s%-10s",
- inMiddle("" + config.getId(), 10),
- inMiddle(config.getHostName(), 50),
- inMiddle(config.getPort() + "", 10),
- inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y"
- : "N", 10));
- }
-
- System.out.println();
- }
-
- @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
- static class StatusArgs extends S4ArgsBase
- {
-
- @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
- List<String> apps;
-
- @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
- List<String> clusters;
-
- @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
- List<String> streams;
-
- @Parameter(names = "-zk", description = "ZooKeeper connection string")
- String zkConnectionString = "localhost:2181";
-
- @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
- int timeout = 10000;
- }
-
- private static void showAppsStatus(List<Cluster> clusters)
- {
- System.out.println("App Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
- inMiddle("Cluster", 20), inMiddle("URI", 90));
- System.out.println(generateEdge(130));
- for (Cluster cluster : clusters)
- {
- if (!NONE.equals(cluster.app.name))
- {
- System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
- inMiddle(cluster.app.cluster, 20), cluster.app.uri);
- }
- }
- System.out.println(generateEdge(130));
-
- }
-
- private static void showClustersStatus(List<Cluster> clusters)
- {
- System.out.println("Cluster Status");
- System.out.println(generateEdge(130));
- System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
- System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20),
- inMiddle("App", 20), inMiddle("Tasks", 10), generateEdge(80));
- System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ",
- inMiddle("Number", 8), inMiddle("Task id", 10), inMiddle("Host", 50),
- inMiddle("Port", 8));
- System.out.println(generateEdge(130));
-
- for (Cluster cluster : clusters)
- {
- System.out.format("%-20s%-20s%-10s%-10s",
- inMiddle(cluster.clusterName, 20), inMiddle(cluster.app.name, 20),
- inMiddle("" + cluster.taskNumber, 8),
- inMiddle("" + cluster.nodes.size(), 8));
- boolean first = true;
- for (ClusterNode node : cluster.nodes)
- {
- if (first)
- {
- first = false;
- } else
- {
- System.out.format("%n%-60s", " ");
+ for (Cluster cluster : clusters) {
+ System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
+ inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
+ inMiddle("" + cluster.nodes.size(), 8));
+ boolean first = true;
+ for (ClusterNode node : cluster.nodes) {
+ if (first) {
+ first = false;
+ } else {
+ System.out.format("%n%-60s", " ");
+ }
+ System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
+ inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+ }
+ System.out.println();
}
- System.out.format("%-10s%-50s%-10s",
- inMiddle("" + node.getTaskId(), 10),
- inMiddle(node.getMachineName(), 50),
- inMiddle(node.getPort() + "", 10));
- }
- System.out.println();
+ System.out.println(generateEdge(130));
}
- System.out.println(generateEdge(130));
- }
- private static void showStreamsStatus(List<Stream> streams)
- {
- System.out.println("Stream Status");
- System.out.println(generateEdge(130));
- System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20),
- inMiddle("Producers", 55), inMiddle("Consumers", 55));
- System.out.println(generateEdge(130));
+ private static void showStreamsStatus(List<Stream> streams) {
+ System.out.println("Stream Status");
+ System.out.println(generateEdge(130));
+ System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
+ inMiddle("Consumers", 55));
+ System.out.println(generateEdge(130));
- for (Stream stream : streams)
- {
- System.out
- .format(
- "%-20s%-55s%-55s%n",
- inMiddle(stream.streamName, 20),
- inMiddle(getFormatString(stream.producers, stream.clusterAppMap),
- 55),
- inMiddle(getFormatString(stream.consumers, stream.clusterAppMap),
- 55));
- }
- System.out.println(generateEdge(130));
-
- }
-
- private static String inMiddle(String content, int width)
- {
- int i = (width - content.length()) / 2;
- return String.format("%" + i + "s%s", " ", content);
- }
-
- private static String generateEdge(int length)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++)
- {
- sb.append("-");
- }
- return sb.toString();
- }
-
- /**
- * show as cluster1(app1), cluster2(app2)
- *
- * @param clusters
- * cluster list
- * @param clusterAppMap
- * <cluster,app>
- * @return
- */
- private static String getFormatString(Collection<String> clusters,
- Map<String, String> clusterAppMap)
- {
- if (clusters == null || clusters.size() == 0)
- {
- return NONE;
- } else
- {
- // show as: cluster1(app1), cluster2(app2)
- StringBuilder sb = new StringBuilder();
- for (String cluster : clusters)
- {
- String app = clusterAppMap.get(cluster);
- sb.append(cluster);
- if (!NONE.equals(app))
- {
- sb.append("(").append(app).append(")");
+ for (Stream stream : streams) {
+ System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
+ inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
+ inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
}
- sb.append(" ");
- }
- return sb.toString();
- }
- }
+ System.out.println(generateEdge(130));
- static class Stream
- {
-
- private final ZkClient zkClient;
- private final String consumerPath;
- private final String producerPath;
-
- String streamName;
- Set<String> producers = new HashSet<String>();// cluster name
- Set<String> consumers = new HashSet<String>();// cluster name
-
- Map<String, String> clusterAppMap = Maps.newHashMap();
-
- public Stream(String streamName, ZkClient zkClient) throws Exception
- {
- this.streamName = streamName;
- this.zkClient = zkClient;
- this.consumerPath = "/s4/streams/" + streamName + "/consumers";
- this.producerPath = "/s4/streams/" + streamName + "/producers";
- readStreamFromZk();
}
- private void readStreamFromZk() throws Exception
- {
- List<String> consumerNodes = zkClient.getChildren(consumerPath);
- for (String node : consumerNodes)
- {
- ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
- consumers.add(consumer.getSimpleField("clusterName"));
- }
-
- List<String> producerNodes = zkClient.getChildren(producerPath);
- for (String node : producerNodes)
- {
- ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
- producers.add(consumer.getSimpleField("clusterName"));
- }
-
- getAppNames();
+ private static String inMiddle(String content, int width) {
+ int i = (width - content.length()) / 2;
+ return String.format("%" + i + "s%s", " ", content);
}
- private void getAppNames()
- {
- Set<String> clusters = new HashSet<String>(consumers);
- clusters.addAll(producers);
- for (String cluster : clusters)
- {
- clusterAppMap.put(cluster, getApp(cluster, zkClient));
- }
- }
-
- public boolean containsCluster(String cluster)
- {
- if (producers.contains(cluster) || consumers.contains(cluster))
- {
- return true;
- }
- return false;
- }
-
- private static String getApp(String clusterName, ZkClient zkClient)
- {
- String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
- if (zkClient.exists(appPath))
- {
- ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName
- + "/app/s4App");
- return appRecord.getSimpleField("name");
- }
- return NONE;
- }
- }
-
- static class App
- {
- private String name = NONE;
- private String cluster;
- private String uri = NONE;
- }
-
- static class Cluster
- {
- private final ZkClient zkClient;
- private final String taskPath;
- private final String processPath;
- private final String appPath;
-
- String clusterName;
- int taskNumber;
- App app;
-
- List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-
- public Cluster(String clusterName, ZkClient zkClient) throws Exception
- {
- this.clusterName = clusterName;
- this.zkClient = zkClient;
- this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
- this.processPath = "/s4/clusters/" + clusterName + "/process";
- this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
- readClusterFromZk();
- }
-
- public void readClusterFromZk() throws Exception
- {
- List<String> processes;
- List<String> tasks;
-
- tasks = zkClient.getChildren(taskPath);
- processes = zkClient.getChildren(processPath);
-
- taskNumber = tasks.size();
-
- for (int i = 0; i < processes.size(); i++)
- {
- ZNRecord process = zkClient.readData(
- processPath + "/" + processes.get(i), true);
- if (process != null)
- {
- int partition = Integer.parseInt(process.getSimpleField("partition"));
- String host = process.getSimpleField("host");
- int port = Integer.parseInt(process.getSimpleField("port"));
- String taskId = process.getSimpleField("taskId");
- ClusterNode node = new ClusterNode(partition, port, host, taskId);
- nodes.add(node);
+ private static String generateEdge(int length) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ sb.append("-");
}
- }
-
- app = new App();
- app.cluster = clusterName;
- try
- {
- ZNRecord appRecord = zkClient.readData(appPath);
- app.name = appRecord.getSimpleField("name");
- app.uri = appRecord.getSimpleField("s4r_uri");
- } catch (ZkNoNodeException e)
- {
- logger.warn(appPath + " doesn't exist");
- }
+ return sb.toString();
}
- }
+ /**
+ * show as cluster1(app1), cluster2(app2)
+ *
+ * @param clusters
+ * cluster list
+ * @param clusterAppMap
+ * <cluster,app>
+ * @return
+ */
+ private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+ if (clusters == null || clusters.size() == 0) {
+ return NONE;
+ } else {
+ // show as: cluster1(app1), cluster2(app2)
+ StringBuilder sb = new StringBuilder();
+ for (String cluster : clusters) {
+ String app = clusterAppMap.get(cluster);
+ sb.append(cluster);
+ if (!NONE.equals(app)) {
+ sb.append("(").append(app).append(")");
+ }
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+ }
+
+ static class Stream {
+
+ private final ZkClient zkClient;
+ private final String consumerPath;
+ private final String producerPath;
+
+ String streamName;
+ Set<String> producers = new HashSet<String>();// cluster name
+ Set<String> consumers = new HashSet<String>();// cluster name
+
+ Map<String, String> clusterAppMap = Maps.newHashMap();
+
+ public Stream(String streamName, ZkClient zkClient) throws Exception {
+ this.streamName = streamName;
+ this.zkClient = zkClient;
+ this.consumerPath = "/s4/streams/" + streamName + "/consumers";
+ this.producerPath = "/s4/streams/" + streamName + "/producers";
+ readStreamFromZk();
+ }
+
+ private void readStreamFromZk() throws Exception {
+ List<String> consumerNodes = zkClient.getChildren(consumerPath);
+ for (String node : consumerNodes) {
+ ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
+ consumers.add(consumer.getSimpleField("clusterName"));
+ }
+
+ List<String> producerNodes = zkClient.getChildren(producerPath);
+ for (String node : producerNodes) {
+ ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
+ producers.add(consumer.getSimpleField("clusterName"));
+ }
+
+ getAppNames();
+ }
+
+ private void getAppNames() {
+ Set<String> clusters = new HashSet<String>(consumers);
+ clusters.addAll(producers);
+ for (String cluster : clusters) {
+ clusterAppMap.put(cluster, getApp(cluster, zkClient));
+ }
+ }
+
+ public boolean containsCluster(String cluster) {
+ if (producers.contains(cluster) || consumers.contains(cluster)) {
+ return true;
+ }
+ return false;
+ }
+
+ private static String getApp(String clusterName, ZkClient zkClient) {
+ String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+ if (zkClient.exists(appPath)) {
+ ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
+ return appRecord.getSimpleField("name");
+ }
+ return NONE;
+ }
+ }
+
+ static class App {
+ private String name = NONE;
+ private String cluster;
+ private String uri = NONE;
+ }
+
+ static class Cluster {
+ private final ZkClient zkClient;
+ private final String taskPath;
+ private final String processPath;
+ private final String appPath;
+
+ String clusterName;
+ int taskNumber;
+ App app;
+
+ List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+
+ public Cluster(String clusterName, ZkClient zkClient) throws Exception {
+ this.clusterName = clusterName;
+ this.zkClient = zkClient;
+ this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+ this.processPath = "/s4/clusters/" + clusterName + "/process";
+ this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+ readClusterFromZk();
+ }
+
+ public void readClusterFromZk() throws Exception {
+ List<String> processes;
+ List<String> tasks;
+
+ tasks = zkClient.getChildren(taskPath);
+ processes = zkClient.getChildren(processPath);
+
+ taskNumber = tasks.size();
+
+ for (int i = 0; i < processes.size(); i++) {
+ ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+ if (process != null) {
+ int partition = Integer.parseInt(process.getSimpleField("partition"));
+ String host = process.getSimpleField("host");
+ int port = Integer.parseInt(process.getSimpleField("port"));
+ String taskId = process.getSimpleField("taskId");
+ ClusterNode node = new ClusterNode(partition, port, host, taskId);
+ nodes.add(node);
+ }
+ }
+
+ app = new App();
+ app.cluster = clusterName;
+ try {
+ ZNRecord appRecord = zkClient.readData(appPath);
+ app.name = appRecord.getSimpleField("name");
+ app.uri = appRecord.getSimpleField("s4r_uri");
+ } catch (ZkNoNodeException e) {
+ logger.warn(appPath + " doesn't exist");
+ }
+ }
+
+ }
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
index 824aa03..ab0ba3e 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -219,7 +219,7 @@
/**
* show as cluster1(app1), cluster2(app2)
- *
+ *
* @param clusters
* cluster list
* @param clusterAppMap
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 192869a..3940c6d 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -38,20 +38,11 @@
static Logger logger = LoggerFactory.getLogger(Tools.class);
- enum Task {
- deployApp(DeployApp.class),
- deploy(Deploy.class),
- node(Main.class),
- addNodes(AddNodes.class),
- zkServer(ZKServer.class),
- newCluster(CreateCluster.class),
- adapter(null),
- genericAdapter(GenericEventAdapter.class),
- newApp(CreateApp.class),
- s4r(Package.class),
- status(S4Status.class),
- createTask(CreateTask.class),
- rebalanceTask(RebalanceTask.class);
+ enum Task {
+ deployApp(DeployApp.class), deploy(Deploy.class), node(Main.class), addNodes(AddNodes.class), zkServer(
+ ZKServer.class), newCluster(CreateCluster.class), adapter(null), genericAdapter(
+ GenericEventAdapter.class), newApp(CreateApp.class), s4r(Package.class), status(S4Status.class), createTask(
+ CreateTask.class), rebalanceTask(RebalanceTask.class);
Class<?> target;