blob: 7ff76e54ffb8cc5ea054fe713315b486a67d4837 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms.server;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Provides convenience methods for audit logging consistently the different
* types of events.
*/
public class KMSAudit {
private static class AuditEvent {
private final AtomicLong accessCount = new AtomicLong(-1);
private final String keyName;
private final String user;
private final KMS.KMSOp op;
private final String extraMsg;
private final long startTime = System.currentTimeMillis();
private AuditEvent(String keyName, String user, KMS.KMSOp op, String msg) {
this.keyName = keyName;
this.user = user;
this.op = op;
this.extraMsg = msg;
}
public String getExtraMsg() {
return extraMsg;
}
public AtomicLong getAccessCount() {
return accessCount;
}
public String getKeyName() {
return keyName;
}
public String getUser() {
return user;
}
public KMS.KMSOp getOp() {
return op;
}
public long getStartTime() {
return startTime;
}
}
public static enum OpStatus {
OK, UNAUTHORIZED, UNAUTHENTICATED, ERROR;
}
private static Set<KMS.KMSOp> AGGREGATE_OPS_WHITELIST = Sets.newHashSet(
KMS.KMSOp.GET_KEY_VERSION, KMS.KMSOp.GET_CURRENT_KEY,
KMS.KMSOp.DECRYPT_EEK, KMS.KMSOp.GENERATE_EEK
);
private Cache<String, AuditEvent> cache;
private ScheduledExecutorService executor;
public static final String KMS_LOGGER_NAME = "kms-audit";
private static Logger AUDIT_LOG = LoggerFactory.getLogger(KMS_LOGGER_NAME);
/**
* Create a new KMSAudit.
*
* @param windowMs Duplicate events within the aggregation window are quashed
* to reduce log traffic. A single message for aggregated
* events is printed at the end of the window, along with a
* count of the number of aggregated events.
*/
KMSAudit(long windowMs) {
cache = CacheBuilder.newBuilder()
.expireAfterWrite(windowMs, TimeUnit.MILLISECONDS)
.removalListener(
new RemovalListener<String, AuditEvent>() {
@Override
public void onRemoval(
RemovalNotification<String, AuditEvent> entry) {
AuditEvent event = entry.getValue();
if (event.getAccessCount().get() > 0) {
KMSAudit.this.logEvent(event);
event.getAccessCount().set(0);
KMSAudit.this.cache.put(entry.getKey(), event);
}
}
}).build();
executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat(KMS_LOGGER_NAME + "_thread").build());
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cache.cleanUp();
}
}, windowMs / 10, windowMs / 10, TimeUnit.MILLISECONDS);
}
private void logEvent(AuditEvent event) {
AUDIT_LOG.info(
"OK[op={}, key={}, user={}, accessCount={}, interval={}ms] {}",
event.getOp(), event.getKeyName(), event.getUser(),
event.getAccessCount().get(),
(System.currentTimeMillis() - event.getStartTime()),
event.getExtraMsg());
}
private void op(OpStatus opStatus, final KMS.KMSOp op, final String user,
final String key, final String extraMsg) {
if (!Strings.isNullOrEmpty(user) && !Strings.isNullOrEmpty(key)
&& (op != null)
&& AGGREGATE_OPS_WHITELIST.contains(op)) {
String cacheKey = createCacheKey(user, key, op);
if (opStatus == OpStatus.UNAUTHORIZED) {
cache.invalidate(cacheKey);
AUDIT_LOG.info("UNAUTHORIZED[op={}, key={}, user={}] {}", op, key, user,
extraMsg);
} else {
try {
AuditEvent event = cache.get(cacheKey, new Callable<AuditEvent>() {
@Override
public AuditEvent call() throws Exception {
return new AuditEvent(key, user, op, extraMsg);
}
});
// Log first access (initialized as -1 so
// incrementAndGet() == 0 implies first access)
if (event.getAccessCount().incrementAndGet() == 0) {
event.getAccessCount().incrementAndGet();
logEvent(event);
}
} catch (ExecutionException ex) {
throw new RuntimeException(ex);
}
}
} else {
List<String> kvs = new LinkedList<String>();
if (op != null) {
kvs.add("op=" + op);
}
if (!Strings.isNullOrEmpty(key)) {
kvs.add("key=" + key);
}
if (!Strings.isNullOrEmpty(user)) {
kvs.add("user=" + user);
}
if (kvs.size() == 0) {
AUDIT_LOG.info("{} {}", opStatus.toString(), extraMsg);
} else {
String join = Joiner.on(", ").join(kvs);
AUDIT_LOG.info("{}[{}] {}", opStatus.toString(), join, extraMsg);
}
}
}
public void ok(UserGroupInformation user, KMS.KMSOp op, String key,
String extraMsg) {
op(OpStatus.OK, op, user.getShortUserName(), key, extraMsg);
}
public void ok(UserGroupInformation user, KMS.KMSOp op, String extraMsg) {
op(OpStatus.OK, op, user.getShortUserName(), null, extraMsg);
}
public void unauthorized(UserGroupInformation user, KMS.KMSOp op, String key) {
op(OpStatus.UNAUTHORIZED, op, user.getShortUserName(), key, "");
}
public void error(UserGroupInformation user, String method, String url,
String extraMsg) {
op(OpStatus.ERROR, null, user.getShortUserName(), null, "Method:'" + method
+ "' Exception:'" + extraMsg + "'");
}
public void unauthenticated(String remoteHost, String method,
String url, String extraMsg) {
op(OpStatus.UNAUTHENTICATED, null, null, null, "RemoteHost:"
+ remoteHost + " Method:" + method
+ " URL:" + url + " ErrorMsg:'" + extraMsg + "'");
}
private static String createCacheKey(String user, String key, KMS.KMSOp op) {
return user + "#" + key + "#" + op;
}
public void shutdown() {
executor.shutdownNow();
}
}