blob: ce41ecdd539dd6789759b2ba4e0049fdce762740 [file] [log] [blame]
package org.apache.s4.tools;
import java.util.Collections;
import java.util.List;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.model.IdealState;
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ClusterFromHelix;
import org.apache.s4.tools.DeployApp.DeployAppArgs;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
public class GenericEventAdapter {
public static void main(String[] args) {
AdapterArgs adapterArgs = new AdapterArgs();
Tools.parseArgs(adapterArgs, args);
try {
String instanceName = "adapter";
HelixManager manager = HelixManagerFactory.getZKHelixManager(adapterArgs.clusterName, instanceName,
InstanceType.SPECTATOR, adapterArgs.zkConnectionString);
ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName, adapterArgs.zkConnectionString,
30, 60);
manager.connect();
manager.addExternalViewChangeListener(cluster);
HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
Builder keyBuilder = helixDataAccessor.keyBuilder();
IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(adapterArgs.streamName));
TCPEmitter emitter = new TCPEmitter(cluster, 1000);
while (true) {
int partitionId = ((int) (Math.random() * 1000)) % idealstate.getNumPartitions();
Event event = new Event();
event.put("name", String.class, "Hello world to partition:" + partitionId);
KryoSerDeser serializer = new KryoSerDeser();
EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
System.out.println("Sending event to partition:" + partitionId);
emitter.send(partitionId, message);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
static class AdapterArgs extends S4ArgsBase {
@Parameter(names = "-zk", description = "ZooKeeper connection string")
String zkConnectionString = "localhost:2181";
@Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
String clusterName;
@Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
String streamName;
}
}