blob: 66c919d793eef0df8494d461552fcdeba2984270 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.tools;
import org.apache.samza.tools.schemas.PageViewEvent;
import org.apache.samza.tools.schemas.ProfileChangeEvent;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.function.Function;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GenerateKafkaEvents {
private static final String OPT_SHORT_TOPIC_NAME = "t";
private static final String OPT_LONG_TOPIC_NAME = "topic";
private static final String OPT_ARG_TOPIC_NAME = "TOPIC_NAME";
private static final String OPT_DESC_TOPIC_NAME = "Name of the topic to write events to.";
private static final String OPT_SHORT_BROKER = "b";
private static final String OPT_LONG_BROKER = "broker";
private static final String OPT_ARG_BROKER = "BROKER";
private static final String OPT_DESC_BROKER = "Kafka broker endpoint.";
private static final String DEFAULT_BROKER = "localhost:9092";
private static final String OPT_SHORT_NUM_EVENTS = "n";
private static final String OPT_LONG_NUM_EVENTS = "numEvents";
private static final String OPT_ARG_NUM_EVENTS = "NUM_EVENTS";
private static final String OPT_DESC_NUM_EVENTS = "Number of events to be produced.";
private static final String OPT_SHORT_EVENT_TYPE = "e";
private static final String OPT_LONG_EVENT_TYPE = "eventtype";
private static final String OPT_ARG_EVENT_TYPE = "EVENT_TYPE";
private static final String OPT_DESC_EVENT_TYPE =
"Type of the event (PageView|ProfileChange) Default(ProfileChange).";
private static final Logger LOG = LoggerFactory.getLogger(GenerateKafkaEvents.class);
private static RandomValueGenerator randValueGenerator;
private static String[] companies =
new String[]{"Microsoft", "LinkedIn", "Google", "Facebook", "Amazon", "Apple", "Twitter", "Snap"};
private static final String PAGEVIEW_EVENTTYPE = "pageview";
public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException {
randValueGenerator = new RandomValueGenerator(System.currentTimeMillis());
Options options = new Options();
options.addOption(
CommandLineHelper.createOption(OPT_SHORT_TOPIC_NAME, OPT_LONG_TOPIC_NAME, OPT_ARG_TOPIC_NAME, true, OPT_DESC_TOPIC_NAME));
options.addOption(
CommandLineHelper.createOption(OPT_SHORT_BROKER, OPT_LONG_BROKER, OPT_ARG_BROKER, false, OPT_DESC_BROKER));
options.addOption(
CommandLineHelper.createOption(OPT_SHORT_NUM_EVENTS, OPT_LONG_NUM_EVENTS, OPT_ARG_NUM_EVENTS, false, OPT_DESC_NUM_EVENTS));
options.addOption(
CommandLineHelper.createOption(OPT_SHORT_EVENT_TYPE, OPT_LONG_EVENT_TYPE, OPT_ARG_EVENT_TYPE, false, OPT_DESC_EVENT_TYPE));
CommandLineParser parser = new BasicParser();
CommandLine cmd;
try {
cmd = parser.parse(options, args);
} catch (Exception e) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(String.format("Error: %s%ngenerate-events.sh", e.getMessage()), options);
return;
}
String topicName = cmd.getOptionValue(OPT_SHORT_TOPIC_NAME);
String broker = cmd.getOptionValue(OPT_SHORT_BROKER, DEFAULT_BROKER);
long numEvents = Long.parseLong(cmd.getOptionValue(OPT_SHORT_NUM_EVENTS, String.valueOf(Long.MAX_VALUE)));
String eventType = cmd.getOptionValue(OPT_SHORT_EVENT_TYPE);
generateEvents(broker, topicName, eventType, numEvents);
}
private static void generateEvents(String brokers, String topicName, String eventType, long numEvents)
throws UnsupportedEncodingException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", brokers);
props.put("retries", 100);
props.put("batch.size", 16384);
props.put("key.serializer", ByteArraySerializer.class.getCanonicalName());
props.put("value.serializer", ByteArraySerializer.class.getCanonicalName());
Function<Integer, Pair<String, byte[]>> eventGenerator;
if (eventType.toLowerCase().contains(PAGEVIEW_EVENTTYPE)) {
eventGenerator = GenerateKafkaEvents::generatePageViewEvent;
} else {
eventGenerator = GenerateKafkaEvents::generateProfileChangeEvent;
}
boolean doSleep = false;
// sleep only when the events have to be produced continuously.
if (numEvents == Long.MAX_VALUE) {
doSleep = true;
}
try (Producer<byte[], byte[]> producer = new KafkaProducer<>(props)) {
for (int index = 0; index < numEvents; index++) {
final int finalIndex = 0;
Pair<String, byte[]> record = eventGenerator.apply(index);
producer.send(new ProducerRecord<>(topicName, record.getLeft().getBytes("UTF-8"), record.getRight()),
(metadata, exception) -> {
if (exception == null) {
LOG.info("send completed for event {} at offset {}", finalIndex, metadata.offset());
} else {
throw new RuntimeException("Failed to send message.", exception);
}
});
System.out.println(String.format("Published event %d to topic %s", index, topicName));
if (doSleep) {
Thread.sleep(1000);
}
}
producer.flush();
}
}
private static Pair<String, byte[]> generateProfileChangeEvent(Integer index) {
ProfileChangeEvent event = new ProfileChangeEvent();
String name = randValueGenerator.getNextString(10, 20);
event.Name = name;
event.NewCompany = companies[randValueGenerator.getNextInt(0, companies.length - 1)];
event.OldCompany = companies[randValueGenerator.getNextInt(0, companies.length - 1)];
event.ProfileChangeTimestamp = System.currentTimeMillis();
byte[] value;
try {
value = encodeAvroSpecificRecord(ProfileChangeEvent.class, event);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new ImmutablePair<>(name, value);
}
/**
* Encode an Avro record into byte array
*
* @param clazz The class type of the avro record
* @param record the instance of the avro record
* @param <T> The type of the avro record.
* @return encoded bytes
* @throws IOException on I/O errors encoding the avro record
*/
public static <T> byte[] encodeAvroSpecificRecord(Class<T> clazz, T record) throws IOException {
DatumWriter<T> msgDatumWriter = new SpecificDatumWriter<>(clazz);
ByteArrayOutputStream os = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(os, null);
msgDatumWriter.write(record, encoder);
encoder.flush();
return os.toByteArray();
}
private static Pair<String, byte[]> generatePageViewEvent(int index) {
PageViewEvent event = new PageViewEvent();
String name = randValueGenerator.getNextString(10, 20);
event.id = randValueGenerator.getNextInt();
event.Name = name;
event.ViewerName = randValueGenerator.getNextString(10, 20);
event.ProfileViewTimestamp = System.currentTimeMillis();
byte[] value;
try {
value = encodeAvroSpecificRecord(PageViewEvent.class, event);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new ImmutablePair<>(name, value);
}
}