blob: e1667a47c01aa719d482ef87bc3a93af0eecabbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ranger.audit.queue;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ranger.audit.destination.AuditDestination;
import org.apache.ranger.audit.provider.AuditHandler;
import org.apache.ranger.audit.provider.BaseAuditHandler;
import org.apache.ranger.audit.provider.MiscUtil;
public abstract class AuditQueue extends BaseAuditHandler {
private static final Log LOG = LogFactory.getLog(AuditQueue.class);
public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 3000;
public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
// This is the max time the consumer thread will wait before exiting the
// loop
public static final int AUDIT_CONSUMER_THREAD_WAIT_MS = 5000;
private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT;
public static final String PROP_QUEUE = "queue";
public static final String PROP_BATCH_SIZE = "batch.size";
public static final String PROP_QUEUE_SIZE = "queue.size";
public static final String PROP_BATCH_INTERVAL = "batch.interval.ms";
public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable";
public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms";
public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent";
final protected AuditHandler consumer;
protected AuditFileSpool fileSpooler = null;
private boolean isDrain = false;
protected boolean fileSpoolerEnabled = false;
protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
protected int fileSpoolDrainThresholdPercent = 80;
boolean isConsumerDestination = false;
// This is set when the first time stop is called.
protected long stopTime = 0;
/**
* @param consumer
*/
public AuditQueue(AuditHandler consumer) {
this.consumer = consumer;
if (consumer instanceof BaseAuditHandler) {
BaseAuditHandler baseAuditHander = (BaseAuditHandler) consumer;
baseAuditHander.setParentPath(getName());
}
if (consumer != null && consumer instanceof AuditDestination) {
// If consumer is destination, then the thread should run as server
// user
isConsumerDestination = true;
}
}
@Override
public void init(Properties props, String basePropertyName) {
LOG.info("BaseAuditProvider.init()");
super.init(props, basePropertyName);
setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "."
+ PROP_BATCH_SIZE, getMaxBatchSize()));
setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "."
+ PROP_QUEUE_SIZE, getMaxQueueSize()));
setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "."
+ PROP_BATCH_INTERVAL, getMaxBatchInterval()));
fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix
+ "." + PROP_FILE_SPOOL_ENABLE, false);
String logFolderProp = MiscUtil.getStringProperty(props, propPrefix
+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR);
if (fileSpoolerEnabled || logFolderProp != null) {
LOG.info("File spool is enabled for " + getName()
+ ", logFolderProp=" + logFolderProp + ", " + propPrefix
+ "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "="
+ fileSpoolerEnabled);
fileSpoolerEnabled = true;
fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix
+ "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN,
fileSpoolMaxWaitTime);
fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props,
propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD,
fileSpoolDrainThresholdPercent);
fileSpooler = new AuditFileSpool(this, consumer);
if (!fileSpooler.init(props, basePropertyName)) {
fileSpoolerEnabled = false;
LOG.fatal("Couldn't initialize file spooler. Disabling it. queue="
+ getName() + ", consumer=" + consumer.getName());
}
} else {
LOG.info("File spool is disabled for " + getName());
}
}
@Override
public void setParentPath(String parentPath) {
super.setParentPath(parentPath);
if (consumer != null && consumer instanceof BaseAuditHandler) {
BaseAuditHandler base = (BaseAuditHandler) consumer;
base.setParentPath(getName());
}
}
@Override
public String getFinalPath() {
if (consumer != null) {
if (consumer instanceof BaseAuditHandler) {
return ((BaseAuditHandler) consumer).getFinalPath();
} else {
return consumer.getName();
}
}
return getName();
}
@Override
public void setName(String name) {
super.setName(name);
if (consumer != null && consumer instanceof BaseAuditHandler) {
BaseAuditHandler base = (BaseAuditHandler) consumer;
base.setParentPath(getName());
}
}
public AuditHandler getConsumer() {
return consumer;
}
public boolean isDrainMaxTimeElapsed() {
return (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS;
}
public boolean isDrain() {
return isDrain;
}
public void setDrain(boolean isDrain) {
if (isDrain && stopTime != 0) {
stopTime = System.currentTimeMillis();
}
this.isDrain = isDrain;
}
public int getMaxQueueSize() {
return maxQueueSize;
}
public void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
public int getMaxBatchInterval() {
return maxBatchInterval;
}
public void setMaxBatchInterval(int maxBatchInterval) {
this.maxBatchInterval = maxBatchInterval;
}
public int getMaxBatchSize() {
return maxBatchSize;
}
public void setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/*
* (non-Javadoc)
*
* @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
*/
@Override
public void waitToComplete() {
if (consumer != null) {
consumer.waitToComplete(-1);
}
}
@Override
public void waitToComplete(long timeout) {
if (consumer != null) {
consumer.waitToComplete(timeout);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.ranger.audit.provider.AuditProvider#flush()
*/
@Override
public void flush() {
if (consumer != null) {
consumer.flush();
}
}
}