blob: e7ccc202a1e0e3a6d10cdf6d79363ea7d03ea8f6 [file] [log] [blame]
/**
* Copyright 2015 IBM
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed Materials - Property of IBM
* (c) Copyright IBM Corp. 2015
*/
package com.ibm.matos;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.google.gson.JsonObject;
public class Batch {
private static Logger logger = Logger.getLogger(Batch.class);
private static KafkaConsumer<byte[], byte[]> kafkaConsumer;
// private Collection<TopicPartition> myPartitions = null;
private static TopicPartition tp;
private static boolean done;
private static final String KAFKA_CONSUMER_ID_KEY = "group.id";
private static final long FETCH_TIMEOUT_SEC = 60;
private static long startOffset, endOffset;
private static BatchAppender processor;
private static Config config;
// main method for Whisk action
public static JsonObject main(JsonObject args) {
try {
Utils.initDirs();
Utils.extractResourcesToFilesystem(false);
config = new Config();
config.overrideProperties(args);
// invoke the "real" main method, shared with Java main
doMain();
} catch (Exception e) {
e.printStackTrace();
}
JsonObject response = new JsonObject();
response.addProperty("offsets", getOffsets());
return response;
}
// Java main class, reading arguments from command line and invoking doMain()
public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
Utils.initDirs();
Utils.extractResourcesToFilesystem(false);
if (args.length >= 5 && args.length <=8) {
config = new Config(args[0]);
HashMap<String,String> amap = new HashMap<String,String>();
amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
amap.put(Config.SWIFT_TENANT_ID_PROP, args[2]);
amap.put(Config.SWIFT_USER_ID_PROP, args[3]);
amap.put(Config.SWIFT_PASSWORD_PROP, args[4]);
if (args.length >= 6)
amap.put(Config.KAFKA_PARTITION_PROP, args[5]);
if (args.length >= 7)
amap.put(Config.KAFKA_START_OFFSET_PROP, args[6]);
if (args.length >= 8)
amap.put(Config.KAFKA_END_OFFSET_PROP, args[7]);
config.overrideProperties(amap);
} else {
logger.log(Level.ERROR, "Usage:\n\n" +
"java -jar <name_of_jar>.jar <config-json-file-name> " +
"<kafka_api_key> <swift_tenant_id> <swift_user_id> <swift_password> " +
"[<kafka_partition> [<kafka_start_offset> [<kafka_end_offset>]]]");
return;
}
// invoke the "real" main method, shared with Whisk's main
doMain();
}
private static void doMain() throws IOException, InterruptedException {
String consumerGroup;
logger.log(Level.INFO, "Starting " + Batch.class.getSimpleName() + "; CONFIG:");
logger.log(Level.INFO, config);
Utils.setJaasLocation();
String apiKey = config.get(Config.KAFKA_API_KEY_PROP);
Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
consumerGroup = config.get(Config.KAFKA_CONSUMER_ID_PROP);
startOffset = Integer.parseInt(config.get(Config.KAFKA_START_OFFSET_PROP));
endOffset = Integer.parseInt(config.get(Config.KAFKA_END_OFFSET_PROP));
String broker = config.get(Config.KAFKA_BROKER_PROP);
String topic = config.get(Config.KAFKA_TOPIC_PROP);
int partition = Integer.parseInt(config.get(Config.KAFKA_PARTITION_PROP));
Properties props = Utils.getClientConfiguration(broker, false);
props.put(KAFKA_CONSUMER_ID_KEY, consumerGroup);
// initialize Kafka consumer
kafkaConsumer = new KafkaConsumer<byte[], byte[]>(props, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
tp = new TopicPartition(topic, partition);
logger.log(Level.INFO, "Assigning topic-partition: " + tp);
kafkaConsumer.assign(Collections.singletonList(tp));
if (startOffset >= 0) {
logger.log(Level.INFO, "Rewinding " + tp + " to " + startOffset);
kafkaConsumer.seek(tp, startOffset);
} else {
logger.log(Level.INFO, "Starting with current offset");
}
processor = new BatchAppender();
done = false;
int received = 0;
long start_time_sec = System.currentTimeMillis() / 1000;
kafkaConsumer.poll(0); // TODO: not needed?
startOffset = kafkaConsumer.position(tp);
if (endOffset < 0) { // get last offset
// rewind to end, get offset, then rewind back
kafkaConsumer.seekToEnd(Collections.singletonList(tp));
endOffset = kafkaConsumer.position(tp); // returns the 'next after last' offset
kafkaConsumer.seek(tp, startOffset);
}
logger.log(Level.INFO, "Offsets to read: [" + startOffset + "," + endOffset +"]");
while (!done) {
// Poll on the Kafka consumer every second.
ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
logger.log(Level.INFO, "Retrieved " + records.count() + " records");
if (records.count() > 0) {
// we might need less than we have in the buffer
// considering also records received in previous iterations
int last = Math.min(records.count(),
(int) (endOffset - startOffset - received));
processor.processRecords(records.records(tp).subList(0, last), tp);
}
received += records.count();
if (startOffset + received >= endOffset) {
logger.log(Level.INFO, "Setting offset of " + tp + "(group=" + consumerGroup + ") to " + endOffset);
kafkaConsumer.seek(tp, endOffset);
kafkaConsumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(endOffset)));
done = true;
} else {
if (System.currentTimeMillis() / 1000 - start_time_sec > FETCH_TIMEOUT_SEC) {
String err = "TIMEOUT fetching from " + tp + ": expected " + (endOffset - startOffset)
+ " messages with offsets [" + startOffset + ".." + endOffset + "], "
+ " and received only " + received + " messages";
logger.log(Level.ERROR, err);
shutdown();
}
}
}
kafkaConsumer.close();
// Store the retrieved messages into Object Storage
if (processor.getLast() > processor.getFirst()) {
BatchObStor obstor = new BatchObStor(config);
final String obstor_path =
"matos/" + System.currentTimeMillis() + "_"
+ processor.getFirst() + "-" + processor.getLast() + ".txt";
obstor.uploadFile(obstor_path, processor.getBytes());
}
System.out.println("Offsets: " + getOffsets());
}
private static String getOffsets() {
return "[" + processor.getFirst() + ".." + processor.getLast() + "]";
}
private static void shutdown() {
done = true;
}
}