blob: 6ee2143de64318dd40cf0cedccb1babad0164abe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ibm.matos;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.google.gson.JsonObject;
public class Load {
private static Logger logger = Logger.getLogger(Load.class);
private static boolean done = false;
private static long lastOffset;
private static Config config;
// main method for Whisk action
public static JsonObject main(JsonObject args) {
try {
Utils.initDirs();
Utils.extractResourcesToFilesystem(true);
config = new Config();
config.overrideProperties(args);
// invoke the "real" main method, shared with Java main
doMain();
} catch (Exception e) {
e.printStackTrace();
}
JsonObject response = new JsonObject();
response.addProperty("last", getLastOffset());
return response;
}
public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
Utils.initDirs();
Utils.extractResourcesToFilesystem(true);
if (args.length == 3) {
config = new Config(args[0]);
HashMap<String,String> amap = new HashMap<String,String>();
amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
amap.put(Config.KAFKA_NUM_RECORDS_PROP, args[2]);
config.overrideProperties(amap);
} else {
logger.log(Level.ERROR, "Usage:\n\n" +
"java -jar <name_of_jar>.jar <config-json-file-name> " +
"<kafka_api_key> <kafka_num_records>");
return;
}
// invoke the "real" main method, shared with Whisk's main
doMain();
}
public static void doMain() throws InterruptedException, ExecutionException {
KafkaProducer<byte[], byte[]> kafkaProducer;
String kafkaHost;
String apiKey;
String topic;
int numRecords;
int producedMessages;
logger.log(Level.INFO, "Starting " + Load.class.getSimpleName() + "; CONFIG:");
logger.log(Level.INFO, config);
Utils.setJaasLocation();
kafkaHost = config.get(Config.KAFKA_BROKER_PROP);
apiKey = config.get(Config.KAFKA_API_KEY_PROP);
topic = config.get(Config.KAFKA_TOPIC_PROP);
numRecords = Integer.parseInt(config.get(Config.KAFKA_NUM_RECORDS_PROP));
Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
kafkaProducer = new KafkaProducer<byte[], byte[]>(
Utils.getClientConfiguration(kafkaHost, true));
done = false;
producedMessages = 0;
lastOffset = -1;
Future<RecordMetadata> fm = null;
while (!done) {
String fieldName = "records";
// Push a message into the list to be sent.
MessageList list = new MessageList();
long now = System.currentTimeMillis();
list.push("This is a test message[" + producedMessages + "] ["
+ new Date(now).toString()
+ "|" + now + "]");
try {
// Create a producer record which will be sent
// to the Message Hub service, providing the topic
// name, field name and message. The field name and
// message are converted to UTF-8.
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic,
fieldName.getBytes("UTF-8"), list.toString().getBytes("UTF-8"));
// keep the metadata of the last produced message
fm = kafkaProducer.send(record);
producedMessages++;
if(producedMessages >= numRecords) {
done = true;
}
} catch (final Exception e) {
e.printStackTrace();
done = true;
}
}
// wait until last message has been sent, retrieve its offset
RecordMetadata m = fm.get();
logger.log(Level.INFO, "[" + producedMessages + " messages sent, last offset: " + m.offset() + "]");
lastOffset = m.offset()+1; // 'next after last' offset
logger.log(Level.INFO, Load.class.toString() + " is shutting down.");
kafkaProducer.close();
System.out.println("Last offset: " + getLastOffset());
}
public static long getLastOffset() {
return lastOffset;
}
}