blob: 597bc031bd0512d7d069a24f96593426d87ff9a8 [file] [log] [blame]
package io.druid.examples.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
/**
*
*/
public class RabbitMQProducerMain
{
public static void main(String[] args)
throws Exception
{
// We use a List to keep track of option insertion order. See below.
final List<Option> optionList = new ArrayList<Option>();
optionList.add(OptionBuilder.withLongOpt("help")
.withDescription("display this help message")
.create("h"));
optionList.add(OptionBuilder.withLongOpt("hostname")
.hasArg()
.withDescription("the hostname of the AMQP broker [defaults to AMQP library default]")
.create("b"));
optionList.add(OptionBuilder.withLongOpt("port")
.hasArg()
.withDescription("the port of the AMQP broker [defaults to AMQP library default]")
.create("n"));
optionList.add(OptionBuilder.withLongOpt("username")
.hasArg()
.withDescription("username to connect to the AMQP broker [defaults to AMQP library default]")
.create("u"));
optionList.add(OptionBuilder.withLongOpt("password")
.hasArg()
.withDescription("password to connect to the AMQP broker [defaults to AMQP library default]")
.create("p"));
optionList.add(OptionBuilder.withLongOpt("vhost")
.hasArg()
.withDescription("name of virtual host on the AMQP broker [defaults to AMQP library default]")
.create("v"));
optionList.add(OptionBuilder.withLongOpt("exchange")
.isRequired()
.hasArg()
.withDescription("name of the AMQP exchange [required - no default]")
.create("e"));
optionList.add(OptionBuilder.withLongOpt("key")
.hasArg()
.withDescription("the routing key to use when sending messages [default: 'default.routing.key']")
.create("k"));
optionList.add(OptionBuilder.withLongOpt("type")
.hasArg()
.withDescription("the type of exchange to create [default: 'topic']")
.create("t"));
optionList.add(OptionBuilder.withLongOpt("durable")
.withDescription("if set, a durable exchange will be declared [default: not set]")
.create("d"));
optionList.add(OptionBuilder.withLongOpt("autodelete")
.withDescription("if set, an auto-delete exchange will be declared [default: not set]")
.create("a"));
optionList.add(OptionBuilder.withLongOpt("single")
.withDescription("if set, only a single message will be sent [default: not set]")
.create("s"));
optionList.add(OptionBuilder.withLongOpt("start")
.hasArg()
.withDescription("time to use to start sending messages from [default: 2010-01-01T00:00:00]")
.create());
optionList.add(OptionBuilder.withLongOpt("stop")
.hasArg()
.withDescription("time to use to send messages until (format: '2013-07-18T23:45:59') [default: current time]")
.create());
optionList.add(OptionBuilder.withLongOpt("interval")
.hasArg()
.withDescription("the interval to add to the timestamp between messages in seconds [default: 10]")
.create());
optionList.add(OptionBuilder.withLongOpt("delay")
.hasArg()
.withDescription("the delay between sending messages in milliseconds [default: 100]")
.create());
// An extremely silly hack to maintain the above order in the help formatting.
HelpFormatter formatter = new HelpFormatter();
// Add a comparator to the HelpFormatter using the ArrayList above to sort by insertion order.
formatter.setOptionComparator(new Comparator(){
@Override
public int compare(Object o1, Object o2)
{
// I know this isn't fast, but who cares! The list is short.
return optionList.indexOf(o1) - optionList.indexOf(o2);
}
});
// Now we can add all the options to an Options instance. This is dumb!
Options options = new Options();
for (Option option : optionList) {
options.addOption(option);
}
CommandLine cmd = null;
try{
cmd = new BasicParser().parse(options, args);
}
catch(ParseException e){
formatter.printHelp("RabbitMQProducerMain", e.getMessage(), options, null);
System.exit(1);
}
if(cmd.hasOption("h")) {
formatter.printHelp("RabbitMQProducerMain", options);
System.exit(2);
}
ConnectionFactory factory = new ConnectionFactory();
if(cmd.hasOption("b")){
factory.setHost(cmd.getOptionValue("b"));
}
if(cmd.hasOption("u")){
factory.setUsername(cmd.getOptionValue("u"));
}
if(cmd.hasOption("p")){
factory.setPassword(cmd.getOptionValue("p"));
}
if(cmd.hasOption("v")){
factory.setVirtualHost(cmd.getOptionValue("v"));
}
if(cmd.hasOption("n")){
factory.setPort(Integer.parseInt(cmd.getOptionValue("n")));
}
String exchange = cmd.getOptionValue("e");
String routingKey = "default.routing.key";
if(cmd.hasOption("k")){
routingKey = cmd.getOptionValue("k");
}
boolean durable = cmd.hasOption("d");
boolean autoDelete = cmd.hasOption("a");
String type = cmd.getOptionValue("t", "topic");
boolean single = cmd.hasOption("single");
int interval = Integer.parseInt(cmd.getOptionValue("interval", "10"));
int delay = Integer.parseInt(cmd.getOptionValue("delay", "100"));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Date stop = sdf.parse(cmd.getOptionValue("stop", sdf.format(new Date())));
Random r = new Random();
Calendar timer = Calendar.getInstance();
timer.setTime(sdf.parse(cmd.getOptionValue("start", "2010-01-01T00:00:00")));
String msg_template = "{\"utcdt\": \"%s\", \"wp\": %d, \"gender\": \"%s\", \"age\": %d}";
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchange, type, durable, autoDelete, null);
do{
int wp = (10 + r.nextInt(90)) * 100;
String gender = r.nextBoolean() ? "male" : "female";
int age = 20 + r.nextInt(70);
String line = String.format(msg_template, sdf.format(timer.getTime()), wp, gender, age);
channel.basicPublish(exchange, routingKey, null, line.getBytes());
System.out.println("Sent message: " + line);
timer.add(Calendar.SECOND, interval);
Thread.sleep(delay);
}while((!single && stop.after(timer.getTime())));
connection.close();
}
}