blob: 493c4cae3f50b3852ccfe68e20b3f53679a547ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal.logging;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.twill.api.logging.LogThrowable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.Services;
import org.apache.twill.internal.json.ILoggingEventSerializer;
import org.apache.twill.internal.json.LogThrowableCodec;
import org.apache.twill.internal.json.StackTraceElementCodec;
import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.kafka.client.Compression;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaPublisher;
import org.apache.twill.zookeeper.RetryStrategies;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.twill.zookeeper.ZKClientServices;
import org.apache.twill.zookeeper.ZKClients;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* A logback {@link Appender} for writing log events to Kafka.
*/
public final class KafkaAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private static final String PUBLISH_THREAD_NAME = "kafka-logger";
private final AtomicReference<KafkaPublisher.Preparer> publisher;
private final Runnable flushTask;
/**
* Rough count of how many entries are being buffered. It's just approximate, not exact.
*/
private final AtomicInteger bufferedSize;
private LogEventConverter eventConverter;
private ZKClientService zkClientService;
private KafkaClientService kafkaClient;
private String zkConnectStr;
private String hostname;
private String runnableName;
private String topic;
private Queue<String> buffer;
private int flushLimit = 20;
private int flushPeriod = 100;
private ScheduledExecutorService scheduler;
public KafkaAppender() {
publisher = new AtomicReference<>();
flushTask = createFlushTask();
bufferedSize = new AtomicInteger();
buffer = new ConcurrentLinkedQueue<>();
}
/**
* Sets the zookeeper connection string. Called by slf4j.
*/
@SuppressWarnings("unused")
public void setZookeeper(String zkConnectStr) {
this.zkConnectStr = zkConnectStr;
}
/**
* Sets the hostname. Called by slf4j.
*/
@SuppressWarnings("unused")
public void setHostname(String hostname) {
this.hostname = hostname;
}
/**
* Sets the runnableName.
*/
@SuppressWarnings("unused")
public void setRunnableName(String runnableName) {
this.runnableName = runnableName;
}
/**
* Sets the topic name for publishing logs. Called by slf4j.
*/
@SuppressWarnings("unused")
public void setTopic(String topic) {
this.topic = topic;
}
/**
* Sets the maximum number of cached log entries before performing an force flush. Called by slf4j.
*/
@SuppressWarnings("unused")
public void setFlushLimit(int flushLimit) {
this.flushLimit = flushLimit;
}
/**
* Sets the periodic flush time in milliseconds. Called by slf4j.
*/
@SuppressWarnings("unused")
public void setFlushPeriod(int flushPeriod) {
this.flushPeriod = flushPeriod;
}
@Override
public void start() {
Preconditions.checkNotNull(zkConnectStr);
eventConverter = new LogEventConverter(hostname, runnableName);
scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory(PUBLISH_THREAD_NAME));
zkClientService = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
kafkaClient = new ZKKafkaClientService(zkClientService);
Futures.addCallback(Services.chainStart(zkClientService, kafkaClient),
new FutureCallback<List<ListenableFuture<Service.State>>>() {
@Override
public void onSuccess(List<ListenableFuture<Service.State>> result) {
for (ListenableFuture<Service.State> future : result) {
Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
"Service is not running.");
}
addInfo("Kafka client started: " + zkConnectStr);
scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable t) {
// Fail to talk to kafka. Other than logging, what can be done?
addError("Failed to start kafka appender.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);
super.start();
}
@Override
public void stop() {
super.stop();
scheduler.shutdownNow();
Futures.getUnchecked(Services.chainStop(kafkaClient, zkClientService));
}
public void forceFlush() {
try {
scheduler.submit(flushTask).get(2, TimeUnit.SECONDS);
} catch (Exception e) {
addError("Failed to force log flush in 2 seconds.", e);
}
}
@Override
protected void append(ILoggingEvent eventObject) {
buffer.offer(eventConverter.convert(eventObject));
if (bufferedSize.incrementAndGet() >= flushLimit && publisher.get() != null) {
// Try to do a extra flush
scheduler.submit(flushTask);
}
}
/**
* Publishes buffered logs to Kafka, within the given timeout.
*
* @return Number of logs published.
* @throws TimeoutException If timeout reached before publish completed.
*/
private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutException {
List<ByteBuffer> logs = Lists.newArrayListWithExpectedSize(bufferedSize.get());
for (String json : Iterables.consumingIterable(buffer)) {
logs.add(Charsets.UTF_8.encode(json));
}
long backOffTime = timeoutUnit.toNanos(timeout) / 10;
if (backOffTime <= 0) {
backOffTime = 1;
}
try {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
long publishTimeout = timeout;
do {
try {
int published = doPublishLogs(logs).get(publishTimeout, timeoutUnit);
bufferedSize.addAndGet(-published);
return published;
} catch (ExecutionException e) {
addError("Failed to publish logs to Kafka.", e);
TimeUnit.NANOSECONDS.sleep(backOffTime);
publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
stopwatch.reset();
stopwatch.start();
}
} while (publishTimeout > 0);
} catch (InterruptedException e) {
addWarn("Logs publish to Kafka interrupted.", e);
}
return 0;
}
private ListenableFuture<Integer> doPublishLogs(Collection <ByteBuffer> logs) {
// Nothing to publish, simply returns a completed future.
if (logs.isEmpty()) {
return Futures.immediateFuture(0);
}
// If the publisher is not available, tries to create one.
KafkaPublisher.Preparer publisher = KafkaAppender.this.publisher.get();
if (publisher == null) {
try {
KafkaPublisher.Preparer preparer = kafkaClient.getPublisher(KafkaPublisher.Ack.LEADER_RECEIVED,
Compression.SNAPPY).prepare(topic);
KafkaAppender.this.publisher.compareAndSet(null, preparer);
publisher = KafkaAppender.this.publisher.get();
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
}
for (ByteBuffer buffer : logs) {
publisher.add(buffer, 0);
}
return publisher.send();
}
/**
* Creates a {@link Runnable} that writes all logs in the buffer into kafka.
* @return The Runnable task
*/
private Runnable createFlushTask() {
return new Runnable() {
@Override
public void run() {
try {
publishLogs(2L, TimeUnit.SECONDS);
} catch (Exception e) {
addError("Failed to push logs to Kafka. Log entries dropped.", e);
}
}
};
}
/**
* Helper class to convert {@link ILoggingEvent} into json string.
*/
private static final class LogEventConverter {
private final Gson gson;
private LogEventConverter(String hostname, String runnableName) {
gson = new GsonBuilder()
.registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec())
.registerTypeAdapter(LogThrowable.class, new LogThrowableCodec())
.registerTypeAdapter(ILoggingEvent.class, new ILoggingEventSerializer(hostname, runnableName))
.create();
}
private String convert(ILoggingEvent event) {
return gson.toJson(event, ILoggingEvent.class);
}
}
}