blob: 7a989c0c66deb97325b37b37e6f92e6095144ad6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockConflictException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.Transaction;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.util.ExecutorServices;
import org.apache.logging.log4j.core.util.FileUtils;
import org.apache.logging.log4j.core.util.Log4jThread;
import org.apache.logging.log4j.core.util.Log4jThreadFactory;
import org.apache.logging.log4j.core.util.SecretKeyProvider;
import org.apache.logging.log4j.plugins.util.PluginManager;
import org.apache.logging.log4j.plugins.util.PluginType;
import org.apache.logging.log4j.util.Strings;
/**
* Manager that persists data to Berkeley DB before passing it on to Flume.
*/
public class FlumePersistentManager extends FlumeAvroManager {
/** Attribute name for the key provider. */
public static final String KEY_PROVIDER = "keyProvider";
private static final Charset UTF8 = StandardCharsets.UTF_8;
private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
private static final long SHUTDOWN_WAIT_MILLIS = 60000;
private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
private static final BDBManagerFactory factory = new BDBManagerFactory();
private final Database database;
private final Environment environment;
private final WriterThread worker;
private final Gate gate = new Gate();
private final SecretKey secretKey;
private final int lockTimeoutRetryCount;
private final ExecutorService threadPool;
private final AtomicLong dbCount = new AtomicLong();
/**
* Constructor
* @param name The unique name of this manager.
* @param shortName Original name for the Manager.
* @param agents An array of Agents.
* @param batchSize The number of events to include in a batch.
* @param retries The number of times to retry connecting before giving up.
* @param connectionTimeout The amount of time to wait for a connection to be established.
* @param requestTimeout The amount of time to wair for a response to a request.
* @param delay The amount of time to wait between retries.
* @param database The database to write to.
* @param environment The database environment.
* @param secretKey The SecretKey to use for encryption.
* @param lockTimeoutRetryCount The number of times to retry a lock timeout.
*/
protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
final int batchSize, final int retries, final int connectionTimeout,
final int requestTimeout, final int delay, final Database database,
final Environment environment, final SecretKey secretKey,
final int lockTimeoutRetryCount) {
super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
this.database = database;
this.environment = environment;
dbCount.set(database.count());
this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
lockTimeoutRetryCount);
this.worker.start();
this.secretKey = secretKey;
this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
this.lockTimeoutRetryCount = lockTimeoutRetryCount;
}
/**
* Returns a FlumeAvroManager.
* @param name The name of the manager.
* @param agents The agents to use.
* @param properties Properties to pass to the Manager.
* @param batchSize The number of events to include in a batch.
* @param retries The number of times to retry connecting before giving up.
* @param connectionTimeout The amount of time to wait to establish a connection.
* @param requestTimeout The amount of time to wait for a response to a request.
* @param delayMillis Amount of time to delay before delivering a batch.
* @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
* @param dataDir The location of the Berkeley database.
* @return A FlumeAvroManager.
*/
public static FlumePersistentManager getManager(final String name, final Agent[] agents,
final Property[] properties, int batchSize, final int retries,
final int connectionTimeout, final int requestTimeout,
final int delayMillis, final int lockTimeoutRetryCount,
final String dataDir) {
if (agents == null || agents.length == 0) {
throw new IllegalArgumentException("At least one agent is required");
}
if (batchSize <= 0) {
batchSize = 1;
}
final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
final StringBuilder sb = new StringBuilder("FlumePersistent[");
boolean first = true;
for (final Agent agent : agents) {
if (!first) {
sb.append(',');
}
sb.append(agent.getHost()).append(':').append(agent.getPort());
first = false;
}
sb.append(']');
sb.append(' ').append(dataDirectory);
return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
}
@Override
public void send(final Event event) {
if (worker.isShutdown()) {
throw new LoggingException("Unable to record event");
}
final Map<String, String> headers = event.getHeaders();
final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
try {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream daos = new DataOutputStream(baos);
daos.writeInt(event.getBody().length);
daos.write(event.getBody(), 0, event.getBody().length);
daos.writeInt(event.getHeaders().size());
for (final Map.Entry<String, String> entry : headers.entrySet()) {
daos.writeUTF(entry.getKey());
daos.writeUTF(entry.getValue());
}
byte[] eventData = baos.toByteArray();
if (secretKey != null) {
final Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
eventData = cipher.doFinal(eventData);
}
final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
try {
future.get();
} catch (final InterruptedException ie) {
// preserve interruption status
Thread.currentThread().interrupt();
}
} catch (final Exception ex) {
throw new LoggingException("Exception occurred writing log event", ex);
}
}
@Override
protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
boolean closed = true;
LOGGER.debug("Shutting down FlumePersistentManager");
worker.shutdown();
final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS;
try {
worker.join(shutdownWaitMillis);
} catch (final InterruptedException ie) {
// Ignore the exception and shutdown.
}
ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString());
try {
worker.join();
} catch (final InterruptedException ex) {
logDebug("interrupted while waiting for worker to complete", ex);
}
try {
LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
database.close();
} catch (final Exception ex) {
logWarn("Failed to close database", ex);
closed = false;
}
try {
environment.cleanLog();
environment.close();
} catch (final Exception ex) {
logWarn("Failed to close environment", ex);
closed = false;
}
return closed && super.releaseSub(timeout, timeUnit);
}
private void doSend(final SimpleEvent event) {
LOGGER.debug("Sending event to Flume");
super.send(event);
}
/**
* Thread for writing to Berkeley DB to avoid having interrupts close the database.
*/
private static class BDBWriter implements Callable<Integer> {
private final byte[] eventData;
private final byte[] keyData;
private final Environment environment;
private final Database database;
private final Gate gate;
private final AtomicLong dbCount;
private final long batchSize;
private final int lockTimeoutRetryCount;
public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
final int lockTimeoutRetryCount) {
this.keyData = keyData;
this.eventData = eventData;
this.environment = environment;
this.database = database;
this.gate = gate;
this.dbCount = dbCount;
this.batchSize = batchSize;
this.lockTimeoutRetryCount = lockTimeoutRetryCount;
}
@Override
public Integer call() throws Exception {
final DatabaseEntry key = new DatabaseEntry(keyData);
final DatabaseEntry data = new DatabaseEntry(eventData);
Exception exception = null;
for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
Transaction txn = null;
try {
txn = environment.beginTransaction(null, null);
try {
database.put(txn, key, data);
txn.commit();
txn = null;
if (dbCount.incrementAndGet() >= batchSize) {
gate.open();
}
exception = null;
break;
} catch (final LockConflictException lce) {
exception = lce;
// Fall through and retry.
} catch (final Exception ex) {
if (txn != null) {
txn.abort();
}
throw ex;
} finally {
if (txn != null) {
txn.abort();
txn = null;
}
}
} catch (final LockConflictException lce) {
exception = lce;
if (txn != null) {
try {
txn.abort();
txn = null;
} catch (final Exception ex) {
LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
}
}
}
try {
Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
} catch (final InterruptedException ie) {
// Ignore the error
}
}
if (exception != null) {
throw exception;
}
return eventData.length;
}
}
/**
* Factory data.
*/
private static class FactoryData {
private final String name;
private final Agent[] agents;
private final int batchSize;
private final String dataDir;
private final int retries;
private final int connectionTimeout;
private final int requestTimeout;
private final int delayMillis;
private final int lockTimeoutRetryCount;
private final Property[] properties;
/**
* Constructor.
* @param name The name of the Appender.
* @param agents The agents.
* @param batchSize The number of events to include in a batch.
* @param dataDir The directory for data.
*/
public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
final int connectionTimeout, final int requestTimeout, final int delayMillis,
final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
this.name = name;
this.agents = agents;
this.batchSize = batchSize;
this.dataDir = dataDir;
this.retries = retries;
this.connectionTimeout = connectionTimeout;
this.requestTimeout = requestTimeout;
this.delayMillis = delayMillis;
this.lockTimeoutRetryCount = lockTimeoutRetryCount;
this.properties = properties;
}
}
/**
* Avro Manager Factory.
*/
private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
/**
* Create the FlumeKratiManager.
* @param name The name of the entity to manage.
* @param data The data required to create the entity.
* @return The FlumeKratiManager.
*/
@Override
public FlumePersistentManager createManager(final String name, final FactoryData data) {
SecretKey secretKey = null;
Database database = null;
Environment environment = null;
final Map<String, String> properties = new HashMap<>();
if (data.properties != null) {
for (final Property property : data.properties) {
properties.put(property.getName(), property.getValue());
}
}
try {
final File dir = new File(data.dataDir);
FileUtils.mkdir(dir, true);
final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
dbEnvConfig.setTransactional(true);
dbEnvConfig.setAllowCreate(true);
dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
environment = new Environment(dir, dbEnvConfig);
final DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
database = environment.openDatabase(null, name, dbConfig);
} catch (final Exception ex) {
LOGGER.error("Could not create FlumePersistentManager", ex);
// For consistency, close database as well as environment even though it should never happen since the
// database is that last thing in the block above, but this does guard against a future line being
// inserted at the end that would bomb (like some debug logging).
if (database != null) {
database.close();
database = null;
}
if (environment != null) {
environment.close();
environment = null;
}
return null;
}
try {
String key = null;
for (final Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
key = entry.getValue();
break;
}
}
if (key != null) {
final PluginManager manager = new PluginManager("KeyProvider");
manager.collectPlugins();
final Map<String, PluginType<?>> plugins = manager.getPlugins();
if (plugins != null) {
boolean found = false;
for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
if (entry.getKey().equalsIgnoreCase(key)) {
found = true;
final Class<?> cl = entry.getValue().getPluginClass();
try {
final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
secretKey = provider.getSecretKey();
LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
} catch (final Exception ex) {
LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
cl.getName());
}
break;
}
}
if (!found) {
LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
}
} else {
LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
}
}
} catch (final Exception ex) {
LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
}
return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
data.lockTimeoutRetryCount);
}
}
/**
* Thread that sends data to Flume and pulls it from Berkeley DB.
*/
private static class WriterThread extends Log4jThread {
private volatile boolean shutdown;
private final Database database;
private final Environment environment;
private final FlumePersistentManager manager;
private final Gate gate;
private final SecretKey secretKey;
private final int batchSize;
private final AtomicLong dbCounter;
private final int lockTimeoutRetryCount;
public WriterThread(final Database database, final Environment environment,
final FlumePersistentManager manager, final Gate gate, final int batchsize,
final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
super("FlumePersistentManager-Writer");
this.database = database;
this.environment = environment;
this.manager = manager;
this.gate = gate;
this.batchSize = batchsize;
this.secretKey = secretKey;
this.setDaemon(true);
this.dbCounter = dbCount;
this.lockTimeoutRetryCount = lockTimeoutRetryCount;
}
public void shutdown() {
LOGGER.debug("Writer thread shutting down");
this.shutdown = true;
gate.open();
}
public boolean isShutdown() {
return shutdown;
}
@Override
public void run() {
LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
while (!shutdown) {
final long nowMillis = System.currentTimeMillis();
final long dbCount = database.count();
dbCounter.set(dbCount);
if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
nextBatchMillis = nowMillis + manager.getDelayMillis();
try {
boolean errors = false;
final DatabaseEntry key = new DatabaseEntry();
final DatabaseEntry data = new DatabaseEntry();
gate.close();
OperationStatus status;
if (batchSize > 1) {
try {
errors = sendBatch(key, data);
} catch (final Exception ex) {
break;
}
} else {
Exception exception = null;
for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
exception = null;
Transaction txn = null;
Cursor cursor = null;
try {
txn = environment.beginTransaction(null, null);
cursor = database.openCursor(txn, null);
try {
status = cursor.getFirst(key, data, LockMode.RMW);
while (status == OperationStatus.SUCCESS) {
final SimpleEvent event = createEvent(data);
if (event != null) {
try {
manager.doSend(event);
} catch (final Exception ioe) {
errors = true;
LOGGER.error("Error sending event", ioe);
break;
}
try {
cursor.delete();
} catch (final Exception ex) {
LOGGER.error("Unable to delete event", ex);
}
}
status = cursor.getNext(key, data, LockMode.RMW);
}
if (cursor != null) {
cursor.close();
cursor = null;
}
txn.commit();
txn = null;
dbCounter.decrementAndGet();
exception = null;
break;
} catch (final LockConflictException lce) {
exception = lce;
// Fall through and retry.
} catch (final Exception ex) {
LOGGER.error("Error reading or writing to database", ex);
shutdown = true;
break;
} finally {
if (cursor != null) {
cursor.close();
cursor = null;
}
if (txn != null) {
txn.abort();
txn = null;
}
}
} catch (final LockConflictException lce) {
exception = lce;
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
if (txn != null) {
try {
txn.abort();
txn = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception aborting tx during lock conflict.");
}
}
}
try {
Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
} catch (final InterruptedException ie) {
// Ignore the error
}
}
if (exception != null) {
LOGGER.error("Unable to read or update data base", exception);
}
}
if (errors) {
Thread.sleep(manager.getDelayMillis());
continue;
}
} catch (final Exception ex) {
LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
}
} else {
if (nextBatchMillis <= nowMillis) {
nextBatchMillis = nowMillis + manager.getDelayMillis();
}
try {
final long interval = nextBatchMillis - nowMillis;
gate.waitForOpen(interval);
} catch (final InterruptedException ie) {
LOGGER.warn("WriterThread interrupted, continuing");
} catch (final Exception ex) {
LOGGER.error("WriterThread encountered an exception waiting for work", ex);
break;
}
}
}
if (batchSize > 1 && database.count() > 0) {
final DatabaseEntry key = new DatabaseEntry();
final DatabaseEntry data = new DatabaseEntry();
try {
sendBatch(key, data);
} catch (final Exception ex) {
LOGGER.warn("Unable to write final batch");
}
}
LOGGER.trace("WriterThread exiting");
}
private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
boolean errors = false;
OperationStatus status;
Cursor cursor = null;
try {
final BatchEvent batch = new BatchEvent();
for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
try {
cursor = database.openCursor(null, CursorConfig.DEFAULT);
status = cursor.getFirst(key, data, null);
for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
final SimpleEvent event = createEvent(data);
if (event != null) {
batch.addEvent(event);
}
status = cursor.getNext(key, data, null);
}
break;
} catch (final LockConflictException lce) {
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
}
}
try {
manager.send(batch);
} catch (final Exception ioe) {
LOGGER.error("Error sending events", ioe);
errors = true;
}
if (!errors) {
if (cursor != null) {
cursor.close();
cursor = null;
}
Transaction txn = null;
Exception exception = null;
for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
try {
txn = environment.beginTransaction(null, null);
try {
for (final Event event : batch.getEvents()) {
try {
final Map<String, String> headers = event.getHeaders();
key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
database.delete(txn, key);
} catch (final Exception ex) {
LOGGER.error("Error deleting key from database", ex);
}
}
txn.commit();
long count = dbCounter.get();
while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
count = dbCounter.get();
}
exception = null;
break;
} catch (final LockConflictException lce) {
exception = lce;
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
if (txn != null) {
try {
txn.abort();
txn = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
}
}
} catch (final Exception ex) {
LOGGER.error("Unable to commit transaction", ex);
if (txn != null) {
txn.abort();
}
}
} catch (final LockConflictException lce) {
exception = lce;
if (cursor != null) {
try {
cursor.close();
cursor = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception closing cursor during lock conflict.");
}
}
if (txn != null) {
try {
txn.abort();
txn = null;
} catch (final Exception ex) {
LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
}
}
} finally {
if (cursor != null) {
cursor.close();
cursor = null;
}
if (txn != null) {
txn.abort();
txn = null;
}
}
try {
Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
} catch (final InterruptedException ie) {
// Ignore the error
}
}
if (exception != null) {
LOGGER.error("Unable to delete events from data base", exception);
}
}
} catch (final Exception ex) {
LOGGER.error("Error reading database", ex);
shutdown = true;
throw ex;
} finally {
if (cursor != null) {
cursor.close();
}
}
return errors;
}
private SimpleEvent createEvent(final DatabaseEntry data) {
final SimpleEvent event = new SimpleEvent();
try {
byte[] eventData = data.getData();
if (secretKey != null) {
final Cipher cipher = Cipher.getInstance("AES");
cipher.init(Cipher.DECRYPT_MODE, secretKey);
eventData = cipher.doFinal(eventData);
}
final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
final DataInputStream dais = new DataInputStream(bais);
int length = dais.readInt();
final byte[] bytes = new byte[length];
dais.read(bytes, 0, length);
event.setBody(bytes);
length = dais.readInt();
final Map<String, String> map = new HashMap<>(length);
for (int i = 0; i < length; ++i) {
final String headerKey = dais.readUTF();
final String value = dais.readUTF();
map.put(headerKey, value);
}
event.setHeaders(map);
return event;
} catch (final Exception ex) {
LOGGER.error("Error retrieving event", ex);
return null;
}
}
}
/**
* An internal class.
*/
private static class Gate {
private boolean isOpen = false;
public boolean isOpen() {
return isOpen;
}
public synchronized void open() {
isOpen = true;
notifyAll();
}
public synchronized void close() {
isOpen = false;
}
public synchronized void waitForOpen(final long timeout) throws InterruptedException {
wait(timeout);
}
}
}