blob: 557ed6e2638edc8bfe10708b7d193c1133f4d2d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.redis.appender;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.StringLayout;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.plugins.Node;
import org.apache.logging.log4j.plugins.Plugin;
import org.apache.logging.log4j.plugins.PluginBuilderAttribute;
import org.apache.logging.log4j.plugins.PluginElement;
import org.apache.logging.log4j.plugins.PluginFactory;
import org.apache.logging.log4j.plugins.validation.constraints.Required;
import org.apache.logging.log4j.core.net.ssl.SslConfiguration;
import org.apache.logging.log4j.spi.AbstractLogger;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Sends log events to a Redis key as a List. All logs are appended to Redis lists via the RPUSH command at keys defined
* in the configuration.
*/
@Plugin(name = "Redis", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
public final class RedisAppender extends AbstractAppender {
// The default port here is the default port for Redis generally.
// For more details, see the full configuration: http://download.redis.io/redis-stable/redis.conf
private static final int DEFAULT_REDIS_PORT = 6379;
private static final String DEFAULT_REDIS_KEYS = "log-events";
private static final int DEFAULT_APPENDER_QUEUE_CAPACITY = 20;
private final RedisManager manager;
private final boolean immediateFlush;
private final LinkedBlockingQueue<String> logQueue;
private RedisAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
final boolean ignoreExceptions, boolean immediateFlush, final int queueCapacity, final RedisManager manager) {
super(name, filter, layout, ignoreExceptions);
this.manager = Objects.requireNonNull(manager, "Redis Manager");
this.immediateFlush = immediateFlush;
this.logQueue = new LinkedBlockingQueue<>(queueCapacity);
}
/**
* Builds RedisAppender instances.
* @param <B> The type to build
*/
public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
implements org.apache.logging.log4j.plugins.util.Builder<RedisAppender> {
private final String KEY_SEPARATOR = ",";
@PluginBuilderAttribute("host")
@Required(message = "No Redis hostname provided")
private String host;
@PluginBuilderAttribute("keys")
private String keys = DEFAULT_REDIS_KEYS;
@PluginBuilderAttribute("port")
private int port = DEFAULT_REDIS_PORT;
@PluginBuilderAttribute("immediateFlush")
private boolean immediateFlush = true;
@PluginBuilderAttribute("queueCapacity")
private int queueCapacity = DEFAULT_APPENDER_QUEUE_CAPACITY;
@PluginElement("SslConfiguration")
private SslConfiguration sslConfiguration;
@PluginElement("RedisPoolConfiguration")
private LoggingRedisPoolConfiguration poolConfiguration = LoggingRedisPoolConfiguration.defaultConfiguration();
@SuppressWarnings("resource")
@Override
public RedisAppender build() {
return new RedisAppender(
getName(),
getLayout(),
getFilter(),
isIgnoreExceptions(),
isImmediateFlush(),
getQueueCapacity(),
getRedisManager()
);
}
String getKeys() {
return keys;
}
String getHost() {
return host;
}
int getQueueCapacity() {
return queueCapacity;
}
boolean isImmediateFlush() {
return immediateFlush;
}
SslConfiguration getSslConfiguration() {
return sslConfiguration;
}
LoggingRedisPoolConfiguration getPoolConfiguration() {
return poolConfiguration;
}
int getPort() {
return port;
}
public B setKeys(final String keys) {
this.keys = keys;
return asBuilder();
}
public B setHost(final String host) {
this.host = host;
return asBuilder();
}
public B setPort(final int port) {
this.port = port;
return asBuilder();
}
public B setQueueCapacity(final int queueCapacity) {
this.queueCapacity = queueCapacity;
return asBuilder();
}
public B setPoolConfiguration(final LoggingRedisPoolConfiguration poolConfiguration) {
this.poolConfiguration = poolConfiguration;
return asBuilder();
}
public B setSslConfiguration(final SslConfiguration ssl) {
this.sslConfiguration = ssl;
return asBuilder();
}
public B setImmediateFlush(final boolean immediateFlush) {
this.immediateFlush = immediateFlush;
return asBuilder();
}
RedisManager getRedisManager() {
return new RedisManager(
getConfiguration().getLoggerContext(),
getName(),
getKeys().split(KEY_SEPARATOR),
getHost(),
getPort(),
getSslConfiguration(),
getPoolConfiguration()
);
}
}
/**
* Creates a builder for a RedisAppender.
* @return a builder for a RedisAppender.
*/
@PluginFactory
public static <B extends Builder<B>> B newBuilder() {
return new Builder<B>().asBuilder();
}
@Override
public void append(final LogEvent event) {
final Layout<? extends Serializable> layout = getLayout();
if (event.getLoggerName() != null && AbstractLogger.getRecursionDepth() > 1) {
LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
} else if (layout instanceof StringLayout) {
String serializedEvent = ((StringLayout)layout).toSerializable(event);
while (!logQueue.offer(serializedEvent)) {
tryFlushQueue();
}
if (shouldFlushLogQueue(event.isEndOfBatch())) {
tryFlushQueue();
}
} else {
throw new AppenderLoggingException("The Redis appender only supports StringLayouts.");
}
}
private boolean shouldFlushLogQueue(boolean endOfBatch) {
return immediateFlush || endOfBatch;
}
private void tryFlushQueue() {
List<String> logEvents = new ArrayList<>();
logQueue.drainTo(logEvents);
manager.sendBulk(logEvents);
}
@Override
public void start() {
setStarting();
manager.startup();
setStarted();
}
@Override
public boolean stop(final long timeout, final TimeUnit timeUnit) {
setStopping();
if (logQueue.size() > 0) {
tryFlushQueue();
}
boolean stopped = super.stop(timeout, timeUnit, false);
stopped &= manager.stop(timeout, timeUnit);
setStopped();
return stopped;
}
@Override
public String toString() {
return "RedisAppender{" +
"name=" + getName() +
", host=" + manager.getHost() +
", port=" + manager.getPort() +
", keys=" + manager.getKeysAsString() +
", immediateFlush=" + this.immediateFlush +
'}';
}
}