blob: 63dfa12ca7a4c2881d82c56cc12fdeea3f194814 [file] [log] [blame]
/**
* Copyright 2015 IBM
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed Materials - Property of IBM
* (c) Copyright IBM Corp. 2015
*/
package com.ibm.matos;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.google.gson.JsonObject;
public class Monitor {
private static Logger logger = Logger.getLogger(Monitor.class);
private static long lastOffset;
private static long committedOffset;
private static Config config;
private final static String KAFKA_CONSUMER_ID_KEY = "group.id";
// main method for Whisk action
public static JsonObject main(JsonObject args) {
try {
Utils.initDirs();
Utils.extractResourcesToFilesystem(false);
config = new Config();
config.overrideProperties(args);
// invoke the "real" main method, shared with Java main
doMain(true);
} catch (Exception e) {
e.printStackTrace();
}
JsonObject response = new JsonObject();
response.addProperty("last", getLastOffset());
response.addProperty("committed", getCommittedOffset());
return response;
}
public static void main(String args[]) throws InterruptedException, ExecutionException, IOException {
Utils.initDirs();
Utils.extractResourcesToFilesystem(false);
if (args.length == 2 || args.length == 3) {
config = new Config(args[0]);
HashMap<String,String> amap = new HashMap<String,String>();
amap.put(Config.KAFKA_API_KEY_PROP, args[1]);
if (args.length == 3) {
amap.put(Config.KAFKA_PARTITION_PROP, args[2]);
}
config.overrideProperties(amap);
} else {
logger.log(Level.ERROR, "Usage:\n\n" +
"java -jar <name_of_jar>.jar <config-json-file-name> " +
"<kafka_api_key> [<kafka_partition>]");
return;
}
// invoke the "real" main method, shared with Whisk's main
doMain(false);
}
private static void doMain(boolean once) throws InterruptedException {
String kafkaHost;
String apiKey;
String topic;
int partition;
String consumerId;
boolean done = false;
Utils.setJaasLocation();
logger.log(Level.INFO, "Starting " + Monitor.class.getSimpleName() + "; CONFIG:");
logger.log(Level.INFO, config);
kafkaHost = config.get(Config.KAFKA_BROKER_PROP);
apiKey = config.get(Config.KAFKA_API_KEY_PROP);
topic = config.get(Config.KAFKA_TOPIC_PROP);
partition = Integer.parseInt(config.get(Config.KAFKA_PARTITION_PROP));
consumerId = config.get(Config.KAFKA_CONSUMER_ID_PROP);
Utils.updateJaasConfiguration(apiKey.substring(0, 16), apiKey.substring(16));
TopicPartition tp = new TopicPartition(topic, partition);
// configure Kafka consumer that will be used to retrieve the last offset
Properties propsLast = Utils.getClientConfiguration(kafkaHost, false);
KafkaConsumer<byte[], byte[]> kafkaConsumerLast = new KafkaConsumer<byte[],
byte[]>(propsLast, new ByteArrayDeserializer(), new ByteArrayDeserializer());
// configure Kafka consumer that will be used to retrieve the committed offset
Properties propsCommitted = Utils.getClientConfiguration(kafkaHost, false);
propsCommitted.put(KAFKA_CONSUMER_ID_KEY, consumerId);
KafkaConsumer<byte[], byte[]> kafkaConsumerCommitted = new KafkaConsumer<byte[],
byte[]>(propsCommitted, new ByteArrayDeserializer(), new ByteArrayDeserializer());
lastOffset = committedOffset = -1;
while (!done) {
// retrieve last and committed offset in the topic-partition
updateLastOffset(kafkaConsumerLast, tp);
updateCommittedOffset(kafkaConsumerCommitted, tp);
System.out.print("[" + getCommittedOffset() + "," + getLastOffset() + "]");
if (once) {
done = true;
} else {
Thread.sleep(5000);
}
}
kafkaConsumerLast.close();
kafkaConsumerCommitted.close();
logger.log(Level.INFO, "Shutting down " + Monitor.class.getSimpleName());
}
private static void updateLastOffset(KafkaConsumer<?, ?> kafkaConsumer, TopicPartition tp) {
kafkaConsumer.assign(Collections.singletonList(tp));
kafkaConsumer.seekToEnd(Collections.singletonList(tp));
lastOffset = kafkaConsumer.position(tp);
logger.log(Level.INFO, "Retrieved last offset: " + lastOffset);
}
private static void updateCommittedOffset(KafkaConsumer<?, ?> kafkaConsumer, TopicPartition tp) {
// does not require assignment
OffsetAndMetadata committed = kafkaConsumer.committed(tp);
logger.log(Level.INFO, "Position of " + tp + ": " + committed);
committedOffset = (committed != null ? committed.offset() : -1);
}
private static long getLastOffset() {
return lastOffset;
}
private static long getCommittedOffset() {
return committedOffset;
}
}