blob: c74a3eac058c8b37cdd665b4a0584735b0711642 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ranger.audit.provider;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.model.AuditEventBase;
public class AsyncAuditProvider extends MultiDestAuditProvider implements
Runnable {
private static final Log LOG = LogFactory.getLog(AsyncAuditProvider.class);
private static int sThreadCount = 0;
private BlockingQueue<AuditEventBase> mQueue = null;
private Thread mThread = null;
private String mName = null;
private int mMaxQueueSize = 10 * 1024;
private int mMaxFlushInterval = 5000; // 5 seconds
private static final int mStopLoopIntervalSecs = 1; // 1 second
private static final int mWaitToCompleteLoopIntervalSecs = 1; // 1 second
// Summary of logs handled
private AtomicLong lifeTimeInLogCount = new AtomicLong(0); // Total count, including drop count
private AtomicLong lifeTimeOutLogCount = new AtomicLong(0);
private AtomicLong lifeTimeDropCount = new AtomicLong(0);
private AtomicLong intervalInLogCount = new AtomicLong(0);
private AtomicLong intervalOutLogCount = new AtomicLong(0);
private AtomicLong intervalDropCount = new AtomicLong(0);
private long lastIntervalLogTime = System.currentTimeMillis();
private int intervalLogDurationMS = 60000;
private long lastFlushTime = System.currentTimeMillis();
public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval) {
LOG.info("AsyncAuditProvider(" + name + "): creating..");
if(maxQueueSize < 1) {
LOG.warn("AsyncAuditProvider(" + name + "): invalid maxQueueSize=" + maxQueueSize + ". will use default " + mMaxQueueSize);
maxQueueSize = mMaxQueueSize;
}
mName = name;
mMaxQueueSize = maxQueueSize;
mMaxFlushInterval = maxFlushInterval;
mQueue = new ArrayBlockingQueue<AuditEventBase>(mMaxQueueSize);
}
public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider) {
this(name, maxQueueSize, maxFlushInterval);
addAuditProvider(provider);
}
@Override
public void init(Properties props) {
LOG.info("AsyncAuditProvider(" + mName + ").init()");
super.init(props);
}
public int getIntervalLogDurationMS() {
return intervalLogDurationMS;
}
public void setIntervalLogDurationMS(int intervalLogDurationMS) {
this.intervalLogDurationMS = intervalLogDurationMS;
}
@Override
public boolean log(AuditEventBase event) {
LOG.debug("AsyncAuditProvider.logEvent(AuditEventBase)");
queueEvent(event);
return true;
}
@Override
public void start() {
mThread = new Thread(this, "AsyncAuditProvider" + (++sThreadCount));
mThread.setDaemon(true);
mThread.start();
super.start();
}
@Override
public void stop() {
LOG.info("==> AsyncAuditProvider.stop()");
try {
LOG.info("Interrupting child thread of " + mName + "..." );
mThread.interrupt();
while (mThread.isAlive()) {
try {
LOG.info(String.format("Waiting for child thread of %s to exit. Sleeping for %d secs", mName, mStopLoopIntervalSecs));
mThread.join(mStopLoopIntervalSecs * 1000);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for child thread to join! Proceeding with stop", e);
break;
}
}
super.stop();
} finally {
LOG.info("<== AsyncAuditProvider.stop()");
}
}
@Override
public void waitToComplete() {
waitToComplete(0);
super.waitToComplete();
}
@Override
public void run() {
LOG.info("==> AsyncAuditProvider.run()");
while (true) {
AuditEventBase event = null;
try {
event = dequeueEvent();
if (event != null) {
super.log(event);
} else {
lastFlushTime = System.currentTimeMillis();
flush();
}
} catch (InterruptedException excp) {
LOG.info("AsyncAuditProvider.run - Interrupted! Breaking out of while loop.");
break;
} catch (Exception excp) {
logFailedEvent(event, excp);
}
}
try {
lastFlushTime = System.currentTimeMillis();
flush();
} catch (Exception excp) {
LOG.error("AsyncAuditProvider.run()", excp);
}
LOG.info("<== AsyncAuditProvider.run()");
}
private void queueEvent(AuditEventBase event) {
// Increase counts
lifeTimeInLogCount.incrementAndGet();
intervalInLogCount.incrementAndGet();
if(! mQueue.offer(event)) {
lifeTimeDropCount.incrementAndGet();
intervalDropCount.incrementAndGet();
}
}
private AuditEventBase dequeueEvent() throws InterruptedException {
AuditEventBase ret = mQueue.poll();
while(ret == null) {
logSummaryIfRequired();
if (mMaxFlushInterval > 0 ) {
long timeTillNextFlush = getTimeTillNextFlush();
if (timeTillNextFlush <= 0) {
break; // force flush
}
ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS);
} else {
// Let's wake up for summary logging
long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime);
waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime;
ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS);
}
}
if(ret != null) {
lifeTimeOutLogCount.incrementAndGet();
intervalOutLogCount.incrementAndGet();
}
logSummaryIfRequired();
return ret;
}
private void logSummaryIfRequired() {
long intervalSinceLastLog = System.currentTimeMillis() - lastIntervalLogTime;
if (intervalSinceLastLog > intervalLogDurationMS) {
if (intervalInLogCount.get() > 0 || intervalOutLogCount.get() > 0 ) {
long queueSize = mQueue.size();
LOG.info("AsyncAuditProvider-stats:" + mName + ": past " + formatIntervalForLog(intervalSinceLastLog)
+ ": inLogs=" + intervalInLogCount.get()
+ ", outLogs=" + intervalOutLogCount.get()
+ ", dropped=" + intervalDropCount.get()
+ ", currentQueueSize=" + queueSize);
LOG.info("AsyncAuditProvider-stats:" + mName + ": process lifetime"
+ ": inLogs=" + lifeTimeInLogCount.get()
+ ", outLogs=" + lifeTimeOutLogCount.get()
+ ", dropped=" + lifeTimeDropCount.get());
}
lastIntervalLogTime = System.currentTimeMillis();
intervalInLogCount.set(0);
intervalOutLogCount.set(0);
intervalDropCount.set(0);
}
}
private boolean isEmpty() {
return mQueue.isEmpty();
}
public void waitToComplete(long maxWaitSeconds) {
LOG.debug("==> AsyncAuditProvider.waitToComplete()");
try {
for (long waitTime = 0; !isEmpty()
&& (maxWaitSeconds <= 0 || maxWaitSeconds > waitTime); waitTime += mWaitToCompleteLoopIntervalSecs) {
try {
LOG.info(String.format("%d messages yet to be flushed by %s. Sleeoping for %d sec", mQueue.size(), mName, mWaitToCompleteLoopIntervalSecs));
Thread.sleep(mWaitToCompleteLoopIntervalSecs * 1000);
} catch (InterruptedException excp) {
// someone really wants service to exit, abandon unwritten audits and exit.
LOG.warn("Caught interrupted exception! " + mQueue.size() + " messages still unflushed! Won't wait for queue to flush, exiting...", excp);
break;
}
}
} finally {
LOG.debug("<== AsyncAuditProvider.waitToComplete()");
}
}
private long getTimeTillNextFlush() {
long timeTillNextFlush = mMaxFlushInterval;
if (mMaxFlushInterval > 0) {
if (lastFlushTime != 0) {
long timeSinceLastFlush = System.currentTimeMillis()
- lastFlushTime;
if (timeSinceLastFlush >= mMaxFlushInterval) {
timeTillNextFlush = 0;
} else {
timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush;
}
}
}
return timeTillNextFlush;
}
}