blob: 3073211641b985a5b7ba1a772f48f2ac489e012f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logfeeder.output;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedTransferQueue;
public class OutputKafka extends Output<LogFeederProps, InputFileMarker> {
private static final Logger logger = LogManager.getLogger(OutputKafka.class);
private static final int FAILED_RETRY_INTERVAL = 30;
private static final int CATCHUP_RETRY_INTERVAL = 5;
private static final int DEFAULT_BATCH_SIZE = 5000;
private static final int DEFAULT_LINGER_MS = 1000;
private String topic = null;
private boolean isAsync = true;
private long messageCount = 0;
private KafkaProducer<String, String> producer = null;
private BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue<KafkaCallBack>();
// Let's start with the assumption Kafka is down
private boolean isKafkaBrokerUp = false;
private LogFeederProps logFeederProps;
@Override
public String getStatMetricName() {
return "output.kafka.write_logs";
}
@Override
public String getWriteBytesMetricName() {
return "output.kafka.write_bytes";
}
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
this.logFeederProps = logFeederProps;
Properties props = initProperties();
producer = creteKafkaProducer(props);
createKafkaRetryThread();
}
private Properties initProperties() throws Exception {
String brokerList = getStringValue("broker_list");
if (StringUtils.isEmpty(brokerList)) {
throw new Exception("For kafka output, bootstrap broker_list is needed");
}
topic = getStringValue("topic");
if (StringUtils.isEmpty(topic)) {
throw new Exception("For kafka output, topic is needed");
}
isAsync = getBooleanValue("is_async", true);
int batchSize = getIntValue("batch_size", DEFAULT_BATCH_SIZE);
int lingerMS = getIntValue("linger_ms", DEFAULT_LINGER_MS);
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("client.id", "logfeeder_producer");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("compression.type", "snappy");
props.put("batch.size", batchSize);
props.put("linger.ms", lingerMS);
for (String key : getConfigs().keySet()) {
if (key.startsWith("kafka.")) {
Object value = getConfigs().get(key);
if (value == null || value.toString().length() == 0) {
continue;
}
String kafkaKey = key.substring("kafka.".length());
logger.info("Adding custom Kafka property. " + kafkaKey + "=" + value);
props.put(kafkaKey, value);
}
}
return props;
}
protected KafkaProducer<String, String> creteKafkaProducer(Properties props) {
return new KafkaProducer<String, String>(props);
}
private void createKafkaRetryThread() {
Thread retryThread = new Thread("kafka-writer-retry,topic=" + topic) {
@Override
public void run() {
KafkaCallBack kafkaCallBack = null;
logger.info("Started thread to monitor failed messsages. " + getShortDescription());
while (true) {
try {
if (kafkaCallBack == null) {
kafkaCallBack = failedMessages.take();
}
if (publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) {
kafkaCallBack = null;
} else {
logger.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for " +
FAILED_RETRY_INTERVAL + " seconds");
Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
}
} catch (Throwable t) {
String logMessageKey = this.getClass().getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending message to Kafka during retry. message=" +
(kafkaCallBack == null ? null : kafkaCallBack.message), t, logger, Level.ERROR);
}
}
}
};
retryThread.setDaemon(true);
retryThread.start();
}
@Override
public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
while (!isDrain() && !inputMarker.getInput().isDrain()) {
try {
if (failedMessages.size() == 0) {
if (publishMessage(block, inputMarker)) {
break;
}
}
if (isDrain() || inputMarker.getInput().isDrain()) {
break;
}
if (!isKafkaBrokerUp) {
logger.error("Kafka is down. Going to sleep for " + FAILED_RETRY_INTERVAL + " seconds");
Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
} else {
logger.warn("Kafka is still catching up from previous failed messages. outstanding messages=" + failedMessages.size() +
" Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds");
Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
}
} catch (Throwable t) {
// ignore
break;
}
}
}
@Override
public void setDrain(boolean drain) {
super.setDrain(drain);
}
public void flush() {
logger.info("Flush called...");
setDrain(true);
}
@Override
public void close() {
logger.info("Closing Kafka client...");
flush();
if (producer != null) {
try {
producer.close();
} catch (Throwable t) {
logger.error("Error closing Kafka topic. topic=" + topic);
}
}
logger.info("Closed Kafka client");
super.close();
}
private boolean publishMessage(String block, InputMarker inputMarker) {
if (isAsync && isKafkaBrokerUp) { // Send asynchronously
producer.send(new ProducerRecord<String, String>(topic, block), new KafkaCallBack(this, block, inputMarker, ++messageCount));
return true;
} else { // Send synchronously
try {
// Not using key. Let it round robin
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic, block)).get();
if (metadata != null) {
statMetric.value++;
writeBytesMetric.value += block.length();
}
if (!isKafkaBrokerUp) {
logger.info("Started writing to kafka. " + getShortDescription());
isKafkaBrokerUp = true;
}
return true;
} catch (InterruptedException e) {
isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_INTERRUPT";
LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e, logger,
Level.ERROR);
} catch (ExecutionException e) {
isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_EXECUTION";
LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e, logger,
Level.ERROR);
} catch (Throwable t) {
isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_WRITE_ERROR";
LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t, logger,
Level.ERROR);
}
}
return false;
}
@Override
public String getShortDescription() {
return "output:destination=kafka,topic=" + topic;
}
class KafkaCallBack implements Callback {
private long thisMessageNumber;
private OutputKafka output = null;
private String message;
private InputMarker inputMarker;
public KafkaCallBack(OutputKafka output, String message, InputMarker inputMarker, long messageCount) {
this.thisMessageNumber = messageCount;
this.output = output;
this.inputMarker = inputMarker;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
if (!output.isKafkaBrokerUp) {
logger.info("Started writing to kafka. " + output.getShortDescription());
output.isKafkaBrokerUp = true;
}
output.incrementStat(1);
output.writeBytesMetric.value += message.length();
} else {
output.isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback", exception, logger,
Level.ERROR);
output.failedMessages.add(this);
}
}
}
@Override
public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
throw new UnsupportedOperationException("copyFile method is not yet supported for output=kafka");
}
@Override
public String getOutputType() {
throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
}
@Override
public Long getPendingCount() {
return 0L;
}
}