| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache license, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the license for the specific language governing permissions and |
| * limitations under the license. |
| */ |
| package org.apache.logging.log4j.flume.appender; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.crypto.Cipher; |
| import javax.crypto.SecretKey; |
| |
| import com.sleepycat.je.Cursor; |
| import com.sleepycat.je.CursorConfig; |
| import com.sleepycat.je.Database; |
| import com.sleepycat.je.DatabaseConfig; |
| import com.sleepycat.je.DatabaseEntry; |
| import com.sleepycat.je.Environment; |
| import com.sleepycat.je.EnvironmentConfig; |
| import com.sleepycat.je.LockConflictException; |
| import com.sleepycat.je.LockMode; |
| import com.sleepycat.je.OperationStatus; |
| import com.sleepycat.je.StatsConfig; |
| import com.sleepycat.je.Transaction; |
| import org.apache.flume.Event; |
| import org.apache.flume.event.SimpleEvent; |
| import org.apache.logging.log4j.LoggingException; |
| import org.apache.logging.log4j.core.appender.ManagerFactory; |
| import org.apache.logging.log4j.core.config.Property; |
| import org.apache.logging.log4j.core.config.plugins.util.PluginManager; |
| import org.apache.logging.log4j.core.config.plugins.util.PluginType; |
| import org.apache.logging.log4j.core.util.ExecutorServices; |
| import org.apache.logging.log4j.core.util.FileUtils; |
| import org.apache.logging.log4j.core.util.Log4jThread; |
| import org.apache.logging.log4j.core.util.Log4jThreadFactory; |
| import org.apache.logging.log4j.core.util.SecretKeyProvider; |
| import org.apache.logging.log4j.util.Strings; |
| |
| /** |
| * Manager that persists data to Berkeley DB before passing it on to Flume. |
| */ |
| public class FlumePersistentManager extends FlumeAvroManager { |
| |
| /** Attribute name for the key provider. */ |
| public static final String KEY_PROVIDER = "keyProvider"; |
| |
| private static final Charset UTF8 = StandardCharsets.UTF_8; |
| |
| private static final String DEFAULT_DATA_DIR = ".log4j/flumeData"; |
| |
| private static final long SHUTDOWN_WAIT_MILLIS = 60000; |
| |
| private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500; |
| |
| private static BDBManagerFactory factory = new BDBManagerFactory(); |
| |
| private final Database database; |
| |
| private final Environment environment; |
| |
| private final WriterThread worker; |
| |
| private final Gate gate = new Gate(); |
| |
| private final SecretKey secretKey; |
| |
| private final int lockTimeoutRetryCount; |
| |
| private final ExecutorService threadPool; |
| |
| private final AtomicLong dbCount = new AtomicLong(); |
| |
| /** |
| * Constructor |
| * @param name The unique name of this manager. |
| * @param shortName Original name for the Manager. |
| * @param agents An array of Agents. |
| * @param batchSize The number of events to include in a batch. |
| * @param retries The number of times to retry connecting before giving up. |
| * @param connectionTimeout The amount of time to wait for a connection to be established. |
| * @param requestTimeout The amount of time to wair for a response to a request. |
| * @param delay The amount of time to wait between retries. |
| * @param database The database to write to. |
| * @param environment The database environment. |
| * @param secretKey The SecretKey to use for encryption. |
| * @param lockTimeoutRetryCount The number of times to retry a lock timeout. |
| */ |
| protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents, |
| final int batchSize, final int retries, final int connectionTimeout, |
| final int requestTimeout, final int delay, final Database database, |
| final Environment environment, final SecretKey secretKey, |
| final int lockTimeoutRetryCount) { |
| super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout); |
| this.database = database; |
| this.environment = environment; |
| dbCount.set(database.count()); |
| this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount, |
| lockTimeoutRetryCount); |
| this.worker.start(); |
| this.secretKey = secretKey; |
| this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume")); |
| this.lockTimeoutRetryCount = lockTimeoutRetryCount; |
| } |
| |
| |
| /** |
| * Returns a FlumeAvroManager. |
| * @param name The name of the manager. |
| * @param agents The agents to use. |
| * @param properties Properties to pass to the Manager. |
| * @param batchSize The number of events to include in a batch. |
| * @param retries The number of times to retry connecting before giving up. |
| * @param connectionTimeout The amount of time to wait to establish a connection. |
| * @param requestTimeout The amount of time to wait for a response to a request. |
| * @param delayMillis Amount of time to delay before delivering a batch. |
| * @param lockTimeoutRetryCount The number of times to retry after a lock timeout. |
| * @param dataDir The location of the Berkeley database. |
| * @return A FlumeAvroManager. |
| */ |
| public static FlumePersistentManager getManager(final String name, final Agent[] agents, |
| final Property[] properties, int batchSize, final int retries, |
| final int connectionTimeout, final int requestTimeout, |
| final int delayMillis, final int lockTimeoutRetryCount, |
| final String dataDir) { |
| if (agents == null || agents.length == 0) { |
| throw new IllegalArgumentException("At least one agent is required"); |
| } |
| |
| if (batchSize <= 0) { |
| batchSize = 1; |
| } |
| final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir; |
| |
| final StringBuilder sb = new StringBuilder("FlumePersistent["); |
| boolean first = true; |
| for (final Agent agent : agents) { |
| if (!first) { |
| sb.append(','); |
| } |
| sb.append(agent.getHost()).append(':').append(agent.getPort()); |
| first = false; |
| } |
| sb.append(']'); |
| sb.append(' ').append(dataDirectory); |
| return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries, |
| connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties)); |
| } |
| |
| @Override |
| public void send(final Event event) { |
| if (worker.isShutdown()) { |
| throw new LoggingException("Unable to record event"); |
| } |
| |
| final Map<String, String> headers = event.getHeaders(); |
| final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8); |
| try { |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| final DataOutputStream daos = new DataOutputStream(baos); |
| daos.writeInt(event.getBody().length); |
| daos.write(event.getBody(), 0, event.getBody().length); |
| daos.writeInt(event.getHeaders().size()); |
| for (final Map.Entry<String, String> entry : headers.entrySet()) { |
| daos.writeUTF(entry.getKey()); |
| daos.writeUTF(entry.getValue()); |
| } |
| byte[] eventData = baos.toByteArray(); |
| if (secretKey != null) { |
| final Cipher cipher = Cipher.getInstance("AES"); |
| cipher.init(Cipher.ENCRYPT_MODE, secretKey); |
| eventData = cipher.doFinal(eventData); |
| } |
| final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database, |
| gate, dbCount, getBatchSize(), lockTimeoutRetryCount)); |
| try { |
| future.get(); |
| } catch (final InterruptedException ie) { |
| // preserve interruption status |
| Thread.currentThread().interrupt(); |
| } |
| } catch (final Exception ex) { |
| throw new LoggingException("Exception occurred writing log event", ex); |
| } |
| } |
| |
| @Override |
| protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) { |
| boolean closed = true; |
| LOGGER.debug("Shutting down FlumePersistentManager"); |
| worker.shutdown(); |
| final long requestedTimeoutMillis = timeUnit.toMillis(timeout); |
| final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS; |
| try { |
| worker.join(shutdownWaitMillis); |
| } catch (final InterruptedException ie) { |
| // Ignore the exception and shutdown. |
| } |
| ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString()); |
| try { |
| worker.join(); |
| } catch (final InterruptedException ex) { |
| logDebug("interrupted while waiting for worker to complete", ex); |
| } |
| try { |
| LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig())); |
| database.close(); |
| } catch (final Exception ex) { |
| logWarn("Failed to close database", ex); |
| closed = false; |
| } |
| try { |
| environment.cleanLog(); |
| environment.close(); |
| } catch (final Exception ex) { |
| logWarn("Failed to close environment", ex); |
| closed = false; |
| } |
| return closed && super.releaseSub(timeout, timeUnit); |
| } |
| |
| private void doSend(final SimpleEvent event) { |
| LOGGER.debug("Sending event to Flume"); |
| super.send(event); |
| } |
| |
| /** |
| * Thread for writing to Berkeley DB to avoid having interrupts close the database. |
| */ |
| private static class BDBWriter implements Callable<Integer> { |
| private final byte[] eventData; |
| private final byte[] keyData; |
| private final Environment environment; |
| private final Database database; |
| private final Gate gate; |
| private final AtomicLong dbCount; |
| private final long batchSize; |
| private final int lockTimeoutRetryCount; |
| |
| public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment, |
| final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize, |
| final int lockTimeoutRetryCount) { |
| this.keyData = keyData; |
| this.eventData = eventData; |
| this.environment = environment; |
| this.database = database; |
| this.gate = gate; |
| this.dbCount = dbCount; |
| this.batchSize = batchSize; |
| this.lockTimeoutRetryCount = lockTimeoutRetryCount; |
| } |
| |
| @Override |
| public Integer call() throws Exception { |
| final DatabaseEntry key = new DatabaseEntry(keyData); |
| final DatabaseEntry data = new DatabaseEntry(eventData); |
| Exception exception = null; |
| for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { |
| Transaction txn = null; |
| try { |
| txn = environment.beginTransaction(null, null); |
| try { |
| database.put(txn, key, data); |
| txn.commit(); |
| txn = null; |
| if (dbCount.incrementAndGet() >= batchSize) { |
| gate.open(); |
| } |
| exception = null; |
| break; |
| } catch (final LockConflictException lce) { |
| exception = lce; |
| // Fall through and retry. |
| } catch (final Exception ex) { |
| if (txn != null) { |
| txn.abort(); |
| } |
| throw ex; |
| } finally { |
| if (txn != null) { |
| txn.abort(); |
| txn = null; |
| } |
| } |
| } catch (final LockConflictException lce) { |
| exception = lce; |
| if (txn != null) { |
| try { |
| txn.abort(); |
| txn = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignoring exception while aborting transaction during lock conflict."); |
| } |
| } |
| |
| } |
| try { |
| Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); |
| } catch (final InterruptedException ie) { |
| // Ignore the error |
| } |
| } |
| if (exception != null) { |
| throw exception; |
| } |
| return eventData.length; |
| } |
| } |
| |
| /** |
| * Factory data. |
| */ |
| private static class FactoryData { |
| private final String name; |
| private final Agent[] agents; |
| private final int batchSize; |
| private final String dataDir; |
| private final int retries; |
| private final int connectionTimeout; |
| private final int requestTimeout; |
| private final int delayMillis; |
| private final int lockTimeoutRetryCount; |
| private final Property[] properties; |
| |
| /** |
| * Constructor. |
| * @param name The name of the Appender. |
| * @param agents The agents. |
| * @param batchSize The number of events to include in a batch. |
| * @param dataDir The directory for data. |
| */ |
| public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries, |
| final int connectionTimeout, final int requestTimeout, final int delayMillis, |
| final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) { |
| this.name = name; |
| this.agents = agents; |
| this.batchSize = batchSize; |
| this.dataDir = dataDir; |
| this.retries = retries; |
| this.connectionTimeout = connectionTimeout; |
| this.requestTimeout = requestTimeout; |
| this.delayMillis = delayMillis; |
| this.lockTimeoutRetryCount = lockTimeoutRetryCount; |
| this.properties = properties; |
| } |
| } |
| |
| /** |
| * Avro Manager Factory. |
| */ |
| private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> { |
| |
| /** |
| * Create the FlumeKratiManager. |
| * @param name The name of the entity to manage. |
| * @param data The data required to create the entity. |
| * @return The FlumeKratiManager. |
| */ |
| @Override |
| public FlumePersistentManager createManager(final String name, final FactoryData data) { |
| SecretKey secretKey = null; |
| Database database = null; |
| Environment environment = null; |
| |
| final Map<String, String> properties = new HashMap<>(); |
| if (data.properties != null) { |
| for (final Property property : data.properties) { |
| properties.put(property.getName(), property.getValue()); |
| } |
| } |
| |
| try { |
| final File dir = new File(data.dataDir); |
| FileUtils.mkdir(dir, true); |
| final EnvironmentConfig dbEnvConfig = new EnvironmentConfig(); |
| dbEnvConfig.setTransactional(true); |
| dbEnvConfig.setAllowCreate(true); |
| dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS); |
| environment = new Environment(dir, dbEnvConfig); |
| final DatabaseConfig dbConfig = new DatabaseConfig(); |
| dbConfig.setTransactional(true); |
| dbConfig.setAllowCreate(true); |
| database = environment.openDatabase(null, name, dbConfig); |
| } catch (final Exception ex) { |
| LOGGER.error("Could not create FlumePersistentManager", ex); |
| // For consistency, close database as well as environment even though it should never happen since the |
| // database is that last thing in the block above, but this does guard against a future line being |
| // inserted at the end that would bomb (like some debug logging). |
| if (database != null) { |
| database.close(); |
| database = null; |
| } |
| if (environment != null) { |
| environment.close(); |
| environment = null; |
| } |
| return null; |
| } |
| |
| try { |
| String key = null; |
| for (final Map.Entry<String, String> entry : properties.entrySet()) { |
| if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) { |
| key = entry.getValue(); |
| break; |
| } |
| } |
| if (key != null) { |
| final PluginManager manager = new PluginManager("KeyProvider"); |
| manager.collectPlugins(); |
| final Map<String, PluginType<?>> plugins = manager.getPlugins(); |
| if (plugins != null) { |
| boolean found = false; |
| for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) { |
| if (entry.getKey().equalsIgnoreCase(key)) { |
| found = true; |
| final Class<?> cl = entry.getValue().getPluginClass(); |
| try { |
| final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance(); |
| secretKey = provider.getSecretKey(); |
| LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName()); |
| } catch (final Exception ex) { |
| LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", |
| cl.getName()); |
| } |
| break; |
| } |
| } |
| if (!found) { |
| LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); |
| } |
| } else { |
| LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key); |
| } |
| } |
| } catch (final Exception ex) { |
| LOGGER.warn("Error setting up encryption - encryption will be disabled", ex); |
| } |
| return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, |
| data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey, |
| data.lockTimeoutRetryCount); |
| } |
| } |
| |
| /** |
| * Thread that sends data to Flume and pulls it from Berkeley DB. |
| */ |
| private static class WriterThread extends Log4jThread { |
| private volatile boolean shutdown = false; |
| private final Database database; |
| private final Environment environment; |
| private final FlumePersistentManager manager; |
| private final Gate gate; |
| private final SecretKey secretKey; |
| private final int batchSize; |
| private final AtomicLong dbCounter; |
| private final int lockTimeoutRetryCount; |
| |
| public WriterThread(final Database database, final Environment environment, |
| final FlumePersistentManager manager, final Gate gate, final int batchsize, |
| final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) { |
| super("FlumePersistentManager-Writer"); |
| this.database = database; |
| this.environment = environment; |
| this.manager = manager; |
| this.gate = gate; |
| this.batchSize = batchsize; |
| this.secretKey = secretKey; |
| this.setDaemon(true); |
| this.dbCounter = dbCount; |
| this.lockTimeoutRetryCount = lockTimeoutRetryCount; |
| } |
| |
| public void shutdown() { |
| LOGGER.debug("Writer thread shutting down"); |
| this.shutdown = true; |
| gate.open(); |
| } |
| |
| public boolean isShutdown() { |
| return shutdown; |
| } |
| |
| @Override |
| public void run() { |
| LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis()); |
| long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis(); |
| while (!shutdown) { |
| final long nowMillis = System.currentTimeMillis(); |
| final long dbCount = database.count(); |
| dbCounter.set(dbCount); |
| if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) { |
| nextBatchMillis = nowMillis + manager.getDelayMillis(); |
| try { |
| boolean errors = false; |
| final DatabaseEntry key = new DatabaseEntry(); |
| final DatabaseEntry data = new DatabaseEntry(); |
| |
| gate.close(); |
| OperationStatus status; |
| if (batchSize > 1) { |
| try { |
| errors = sendBatch(key, data); |
| } catch (final Exception ex) { |
| break; |
| } |
| } else { |
| Exception exception = null; |
| for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { |
| exception = null; |
| Transaction txn = null; |
| Cursor cursor = null; |
| try { |
| txn = environment.beginTransaction(null, null); |
| cursor = database.openCursor(txn, null); |
| try { |
| status = cursor.getFirst(key, data, LockMode.RMW); |
| while (status == OperationStatus.SUCCESS) { |
| final SimpleEvent event = createEvent(data); |
| if (event != null) { |
| try { |
| manager.doSend(event); |
| } catch (final Exception ioe) { |
| errors = true; |
| LOGGER.error("Error sending event", ioe); |
| break; |
| } |
| try { |
| cursor.delete(); |
| } catch (final Exception ex) { |
| LOGGER.error("Unable to delete event", ex); |
| } |
| } |
| status = cursor.getNext(key, data, LockMode.RMW); |
| } |
| if (cursor != null) { |
| cursor.close(); |
| cursor = null; |
| } |
| txn.commit(); |
| txn = null; |
| dbCounter.decrementAndGet(); |
| exception = null; |
| break; |
| } catch (final LockConflictException lce) { |
| exception = lce; |
| // Fall through and retry. |
| } catch (final Exception ex) { |
| LOGGER.error("Error reading or writing to database", ex); |
| shutdown = true; |
| break; |
| } finally { |
| if (cursor != null) { |
| cursor.close(); |
| cursor = null; |
| } |
| if (txn != null) { |
| txn.abort(); |
| txn = null; |
| } |
| } |
| } catch (final LockConflictException lce) { |
| exception = lce; |
| if (cursor != null) { |
| try { |
| cursor.close(); |
| cursor = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignored exception closing cursor during lock conflict."); |
| } |
| } |
| if (txn != null) { |
| try { |
| txn.abort(); |
| txn = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignored exception aborting tx during lock conflict."); |
| } |
| } |
| } |
| try { |
| Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); |
| } catch (final InterruptedException ie) { |
| // Ignore the error |
| } |
| } |
| if (exception != null) { |
| LOGGER.error("Unable to read or update data base", exception); |
| } |
| } |
| if (errors) { |
| Thread.sleep(manager.getDelayMillis()); |
| continue; |
| } |
| } catch (final Exception ex) { |
| LOGGER.warn("WriterThread encountered an exception. Continuing.", ex); |
| } |
| } else { |
| if (nextBatchMillis <= nowMillis) { |
| nextBatchMillis = nowMillis + manager.getDelayMillis(); |
| } |
| try { |
| final long interval = nextBatchMillis - nowMillis; |
| gate.waitForOpen(interval); |
| } catch (final InterruptedException ie) { |
| LOGGER.warn("WriterThread interrupted, continuing"); |
| } catch (final Exception ex) { |
| LOGGER.error("WriterThread encountered an exception waiting for work", ex); |
| break; |
| } |
| } |
| } |
| |
| if (batchSize > 1 && database.count() > 0) { |
| final DatabaseEntry key = new DatabaseEntry(); |
| final DatabaseEntry data = new DatabaseEntry(); |
| try { |
| sendBatch(key, data); |
| } catch (final Exception ex) { |
| LOGGER.warn("Unable to write final batch"); |
| } |
| } |
| LOGGER.trace("WriterThread exiting"); |
| } |
| |
| private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception { |
| boolean errors = false; |
| OperationStatus status; |
| Cursor cursor = null; |
| try { |
| final BatchEvent batch = new BatchEvent(); |
| for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { |
| try { |
| cursor = database.openCursor(null, CursorConfig.DEFAULT); |
| status = cursor.getFirst(key, data, null); |
| |
| for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) { |
| final SimpleEvent event = createEvent(data); |
| if (event != null) { |
| batch.addEvent(event); |
| } |
| status = cursor.getNext(key, data, null); |
| } |
| break; |
| } catch (final LockConflictException lce) { |
| if (cursor != null) { |
| try { |
| cursor.close(); |
| cursor = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignored exception closing cursor during lock conflict."); |
| } |
| } |
| } |
| } |
| |
| try { |
| manager.send(batch); |
| } catch (final Exception ioe) { |
| LOGGER.error("Error sending events", ioe); |
| errors = true; |
| } |
| if (!errors) { |
| if (cursor != null) { |
| cursor.close(); |
| cursor = null; |
| } |
| Transaction txn = null; |
| Exception exception = null; |
| for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) { |
| try { |
| txn = environment.beginTransaction(null, null); |
| try { |
| for (final Event event : batch.getEvents()) { |
| try { |
| final Map<String, String> headers = event.getHeaders(); |
| key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8)); |
| database.delete(txn, key); |
| } catch (final Exception ex) { |
| LOGGER.error("Error deleting key from database", ex); |
| } |
| } |
| txn.commit(); |
| long count = dbCounter.get(); |
| while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) { |
| count = dbCounter.get(); |
| } |
| exception = null; |
| break; |
| } catch (final LockConflictException lce) { |
| exception = lce; |
| if (cursor != null) { |
| try { |
| cursor.close(); |
| cursor = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignored exception closing cursor during lock conflict."); |
| } |
| } |
| if (txn != null) { |
| try { |
| txn.abort(); |
| txn = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignored exception aborting transaction during lock conflict."); |
| } |
| } |
| } catch (final Exception ex) { |
| LOGGER.error("Unable to commit transaction", ex); |
| if (txn != null) { |
| txn.abort(); |
| } |
| } |
| } catch (final LockConflictException lce) { |
| exception = lce; |
| if (cursor != null) { |
| try { |
| cursor.close(); |
| cursor = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignored exception closing cursor during lock conflict."); |
| } |
| } |
| if (txn != null) { |
| try { |
| txn.abort(); |
| txn = null; |
| } catch (final Exception ex) { |
| LOGGER.trace("Ignored exception aborting transaction during lock conflict."); |
| } |
| } |
| } finally { |
| if (cursor != null) { |
| cursor.close(); |
| cursor = null; |
| } |
| if (txn != null) { |
| txn.abort(); |
| txn = null; |
| } |
| } |
| try { |
| Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS); |
| } catch (final InterruptedException ie) { |
| // Ignore the error |
| } |
| } |
| if (exception != null) { |
| LOGGER.error("Unable to delete events from data base", exception); |
| } |
| } |
| } catch (final Exception ex) { |
| LOGGER.error("Error reading database", ex); |
| shutdown = true; |
| throw ex; |
| } finally { |
| if (cursor != null) { |
| cursor.close(); |
| } |
| } |
| |
| return errors; |
| } |
| |
| private SimpleEvent createEvent(final DatabaseEntry data) { |
| final SimpleEvent event = new SimpleEvent(); |
| try { |
| byte[] eventData = data.getData(); |
| if (secretKey != null) { |
| final Cipher cipher = Cipher.getInstance("AES"); |
| cipher.init(Cipher.DECRYPT_MODE, secretKey); |
| eventData = cipher.doFinal(eventData); |
| } |
| final ByteArrayInputStream bais = new ByteArrayInputStream(eventData); |
| final DataInputStream dais = new DataInputStream(bais); |
| int length = dais.readInt(); |
| final byte[] bytes = new byte[length]; |
| dais.read(bytes, 0, length); |
| event.setBody(bytes); |
| length = dais.readInt(); |
| final Map<String, String> map = new HashMap<>(length); |
| for (int i = 0; i < length; ++i) { |
| final String headerKey = dais.readUTF(); |
| final String value = dais.readUTF(); |
| map.put(headerKey, value); |
| } |
| event.setHeaders(map); |
| return event; |
| } catch (final Exception ex) { |
| LOGGER.error("Error retrieving event", ex); |
| return null; |
| } |
| } |
| |
| } |
| |
| /** |
| * An internal class. |
| */ |
| private static class Gate { |
| |
| private boolean isOpen = false; |
| |
| public boolean isOpen() { |
| return isOpen; |
| } |
| |
| public synchronized void open() { |
| isOpen = true; |
| notifyAll(); |
| } |
| |
| public synchronized void close() { |
| isOpen = false; |
| } |
| |
| public synchronized void waitForOpen(final long timeout) throws InterruptedException { |
| wait(timeout); |
| } |
| } |
| } |