| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.security; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.io.StringWriter; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import com.codahale.metrics.Counter; |
| import com.codahale.metrics.Meter; |
| import com.codahale.metrics.Timer; |
| import com.fasterxml.jackson.annotation.JsonInclude.Include; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.SerializationFeature; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.core.SolrInfoBean; |
| import org.apache.solr.metrics.SolrMetricProducer; |
| import org.apache.solr.metrics.SolrMetricsContext; |
| import org.apache.solr.security.AuditEvent.EventType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Base class for Audit logger plugins. |
| * This interface may change in next release and is marked experimental |
| * @since 8.1.0 |
| * @lucene.experimental |
| */ |
| public abstract class AuditLoggerPlugin implements Closeable, Runnable, SolrInfoBean, SolrMetricProducer { |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| private static final String PARAM_EVENT_TYPES = "eventTypes"; |
| static final String PARAM_ASYNC = "async"; |
| static final String PARAM_BLOCKASYNC = "blockAsync"; |
| static final String PARAM_QUEUE_SIZE = "queueSize"; |
| static final String PARAM_NUM_THREADS = "numThreads"; |
| static final String PARAM_MUTE_RULES = "muteRules"; |
| private static final int DEFAULT_QUEUE_SIZE = 4096; |
| private static final int DEFAULT_NUM_THREADS = Math.max(2, Runtime.getRuntime().availableProcessors() / 2); |
| |
| BlockingQueue<AuditEvent> queue; |
| AtomicInteger auditsInFlight = new AtomicInteger(0); |
| boolean async; |
| boolean blockAsync; |
| int blockingQueueSize; |
| |
| protected AuditEventFormatter formatter; |
| private Set<String> metricNames = ConcurrentHashMap.newKeySet(); |
| private ExecutorService executorService; |
| private boolean closed; |
| private MuteRules muteRules; |
| |
| protected SolrMetricsContext solrMetricsContext; |
| protected Meter numErrors = new Meter(); |
| protected Meter numLost = new Meter(); |
| protected Meter numLogged = new Meter(); |
| protected Timer requestTimes = new Timer(); |
| protected Timer queuedTime = new Timer(); |
| protected Counter totalTime = new Counter(); |
| |
| // Event types to be logged by default |
| protected List<String> eventTypes = Arrays.asList( |
| EventType.COMPLETED.name(), |
| EventType.ERROR.name(), |
| EventType.REJECTED.name(), |
| EventType.UNAUTHORIZED.name(), |
| EventType.ANONYMOUS_REJECTED.name()); |
| |
| /** |
| * Initialize the plugin from security.json. |
| * This method removes parameters from config object after consuming, so subclasses can check for config errors. |
| * @param pluginConfig the config for the plugin |
| */ |
| @SuppressWarnings({"unchecked"}) |
| public void init(Map<String, Object> pluginConfig) { |
| formatter = new JSONAuditEventFormatter(); |
| if (pluginConfig.containsKey(PARAM_EVENT_TYPES)) { |
| eventTypes = (List<String>) pluginConfig.get(PARAM_EVENT_TYPES); |
| } |
| pluginConfig.remove(PARAM_EVENT_TYPES); |
| |
| async = Boolean.parseBoolean(String.valueOf(pluginConfig.getOrDefault(PARAM_ASYNC, true))); |
| blockAsync = Boolean.parseBoolean(String.valueOf(pluginConfig.getOrDefault(PARAM_BLOCKASYNC, false))); |
| blockingQueueSize = async ? Integer.parseInt(String.valueOf(pluginConfig.getOrDefault(PARAM_QUEUE_SIZE, DEFAULT_QUEUE_SIZE))) : 1; |
| int numThreads = async ? Integer.parseInt(String.valueOf(pluginConfig.getOrDefault(PARAM_NUM_THREADS, DEFAULT_NUM_THREADS))) : 1; |
| muteRules = new MuteRules(pluginConfig.remove(PARAM_MUTE_RULES)); |
| pluginConfig.remove(PARAM_ASYNC); |
| pluginConfig.remove(PARAM_BLOCKASYNC); |
| pluginConfig.remove(PARAM_QUEUE_SIZE); |
| pluginConfig.remove(PARAM_NUM_THREADS); |
| if (async) { |
| queue = new ArrayBlockingQueue<>(blockingQueueSize); |
| executorService = ExecutorUtil.newMDCAwareFixedThreadPool(numThreads, new SolrNamedThreadFactory("audit")); |
| executorService.submit(this); |
| } |
| pluginConfig.remove("class"); |
| log.debug("AuditLogger initialized in {} mode with event types {}", async?"async":"syncronous", eventTypes); |
| } |
| |
| /** |
| * This is the method that each Audit plugin has to implement to do the actual logging. |
| * @param event the audit event |
| */ |
| protected abstract void audit(AuditEvent event); |
| |
| /** |
| * Called by the framework, and takes care of metrics tracking and to dispatch |
| * to either synchronous or async logging. |
| */ |
| public final void doAudit(AuditEvent event) { |
| if (shouldMute(event)) { |
| log.debug("Event muted due to mute rule(s)"); |
| return; |
| } |
| if (async) { |
| auditAsync(event); |
| } else { |
| Timer.Context timer = requestTimes.time(); |
| numLogged.mark(); |
| try { |
| audit(event); |
| } catch(Exception e) { |
| numErrors.mark(); |
| log.error("Exception when attempting to audit log", e); |
| } finally { |
| totalTime.inc(timer.stop()); |
| } |
| } |
| } |
| |
| /** |
| * Returns true if any of the configured mute rules matches. The inner lists are ORed, while rules inside |
| * inner lists are ANDed |
| * @param event the audit event |
| */ |
| protected boolean shouldMute(AuditEvent event) { |
| return muteRules.shouldMute(event); |
| } |
| |
| /** |
| * Enqueues an {@link AuditEvent} to a queue and returns immediately. |
| * A background thread will pull events from this queue and call {@link #audit(AuditEvent)} |
| * @param event the audit event |
| */ |
| protected final void auditAsync(AuditEvent event) { |
| assert(async); |
| if (blockAsync) { |
| try { |
| queue.put(event); |
| } catch (InterruptedException e) { |
| log.warn("Interrupted while waiting to insert AuditEvent into blocking queue"); |
| Thread.currentThread().interrupt(); |
| } |
| } else { |
| if (!queue.offer(event)) { |
| log.warn("Audit log async queue is full (size={}), not blocking since {}==false", blockingQueueSize, PARAM_BLOCKASYNC); |
| numLost.mark(); |
| } |
| } |
| } |
| |
| /** |
| * Pick next event from async queue and call {@link #audit(AuditEvent)} |
| */ |
| @Override |
| public void run() { |
| assert(async); |
| while (!closed && !Thread.currentThread().isInterrupted()) { |
| try { |
| AuditEvent event = queue.poll(1000, TimeUnit.MILLISECONDS); |
| auditsInFlight.incrementAndGet(); |
| if (event == null) continue; |
| if (event.getDate() != null) { |
| queuedTime.update(new Date().getTime() - event.getDate().getTime(), TimeUnit.MILLISECONDS); |
| } |
| Timer.Context timer = requestTimes.time(); |
| audit(event); |
| numLogged.mark(); |
| totalTime.inc(timer.stop()); |
| } catch (InterruptedException e) { |
| log.warn("Interrupted while waiting for next audit log event"); |
| Thread.currentThread().interrupt(); |
| } catch (Exception ex) { |
| log.error("Exception when attempting to audit log asynchronously", ex); |
| numErrors.mark(); |
| } finally { |
| auditsInFlight.decrementAndGet(); |
| } |
| } |
| } |
| |
| /** |
| * Checks whether this event type should be logged based on "eventTypes" config parameter. |
| * |
| * @param eventType the event type to consider |
| * @return true if this event type should be logged |
| */ |
| public boolean shouldLog(EventType eventType) { |
| boolean shouldLog = eventTypes.contains(eventType.name()); |
| if (!shouldLog) { |
| if (log.isDebugEnabled()) { |
| log.debug("Event type {} is not configured for audit logging", eventType.name()); |
| } |
| } |
| return shouldLog; |
| } |
| |
| public void setFormatter(AuditEventFormatter formatter) { |
| this.formatter = formatter; |
| } |
| |
| @Override |
| public void initializeMetrics(SolrMetricsContext parentContext, final String scope) { |
| solrMetricsContext = parentContext.getChildContext(this); |
| String className = this.getClass().getSimpleName(); |
| log.debug("Initializing metrics for {}", className); |
| numErrors = solrMetricsContext.meter(this, "errors", getCategory().toString(), scope, className); |
| numLost = solrMetricsContext.meter(this, "lost", getCategory().toString(), scope, className); |
| numLogged = solrMetricsContext.meter(this, "count", getCategory().toString(), scope, className); |
| requestTimes = solrMetricsContext.timer(this, "requestTimes", getCategory().toString(), scope, className); |
| totalTime = solrMetricsContext.counter(this, "totalTime", getCategory().toString(), scope, className); |
| if (async) { |
| solrMetricsContext.gauge(this, () -> blockingQueueSize, true, "queueCapacity", getCategory().toString(), scope, className); |
| solrMetricsContext.gauge(this, () -> blockingQueueSize - queue.remainingCapacity(), true, "queueSize", getCategory().toString(), scope, className); |
| queuedTime = solrMetricsContext.timer(this, "queuedTime", getCategory().toString(), scope, className); |
| } |
| solrMetricsContext.gauge(this, () -> async, true, "async", getCategory().toString(), scope, className); |
| } |
| |
| @Override |
| public String getName() { |
| return this.getClass().getName(); |
| } |
| |
| @Override |
| public String getDescription() { |
| return "Auditlogger Plugin " + this.getClass().getName(); |
| } |
| |
| @Override |
| public Category getCategory() { |
| return Category.SECURITY; |
| } |
| |
| @Override |
| public Set<String> getMetricNames() { |
| return metricNames; |
| } |
| |
| @Override |
| public SolrMetricsContext getSolrMetricsContext() { |
| return solrMetricsContext; |
| } |
| |
| /** |
| * Interface for formatting the event |
| */ |
| public interface AuditEventFormatter { |
| String formatEvent(AuditEvent event); |
| } |
| |
| /** |
| * Event formatter that returns event as JSON string |
| */ |
| public static class JSONAuditEventFormatter implements AuditEventFormatter { |
| private static ObjectMapper mapper = new ObjectMapper() |
| .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) |
| .setSerializationInclusion(Include.NON_NULL); |
| |
| /** |
| * Formats an audit event as a JSON string |
| */ |
| @Override |
| public String formatEvent(AuditEvent event) { |
| try { |
| StringWriter sw = new StringWriter(); |
| mapper.writeValue(sw, event); |
| return sw.toString(); |
| } catch (IOException e) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error converting Event to JSON", e); |
| } |
| } |
| } |
| |
| /** |
| * Waits 30s for async queue to drain, then closes executor threads. |
| * Subclasses should either call <code>super.close()</code> or {@link #waitForQueueToDrain(int)} |
| * <b>before</b> shutting itself down to make sure they can complete logging events in the queue. |
| */ |
| @Override |
| public void close() throws IOException { |
| if (async && executorService != null) { |
| waitForQueueToDrain(30); |
| closed = true; |
| log.info("Shutting down async Auditlogger background thread(s)"); |
| executorService.shutdownNow(); |
| try { |
| SolrMetricProducer.super.close(); |
| } catch (Exception e) { |
| throw new IOException("Exception closing", e); |
| } |
| } |
| } |
| |
| /** |
| * Blocks until the async event queue is drained |
| * @param timeoutSeconds number of seconds to wait for queue to drain |
| */ |
| protected void waitForQueueToDrain(int timeoutSeconds) { |
| if (async && executorService != null) { |
| int timeSlept = 0; |
| while ((!queue.isEmpty() || auditsInFlight.get() > 0) && timeSlept < timeoutSeconds) { |
| try { |
| if (log.isInfoEnabled()) { |
| log.info("Async auditlogger queue still has {} elements and {} audits in-flight, sleeping to drain...", queue.size(), auditsInFlight.get()); |
| } |
| Thread.sleep(1000); |
| timeSlept ++; |
| } catch (InterruptedException ignored) {} |
| } |
| } |
| } |
| |
| /** |
| * Set of rules for when audit logging should be muted. |
| */ |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| private class MuteRules { |
| private List<List<MuteRule>> rules; |
| |
| MuteRules(Object o) { |
| rules = new ArrayList<>(); |
| if (o != null) { |
| if (o instanceof List) { |
| ((List)o).forEach(l -> { |
| if (l instanceof String) { |
| rules.add(Collections.singletonList(parseRule(l))); |
| } else if (l instanceof List) { |
| List<MuteRule> rl = new ArrayList<>(); |
| ((List) l).forEach(r -> rl.add(parseRule(r))); |
| rules.add(rl); |
| } |
| }); |
| } else { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The " + PARAM_MUTE_RULES + " configuration must be a list"); |
| } |
| } |
| } |
| |
| private MuteRule parseRule(Object ruleObj) { |
| try { |
| String rule = (String) ruleObj; |
| if (rule.startsWith("type:")) { |
| AuditEvent.RequestType muteType = AuditEvent.RequestType.valueOf(rule.substring("type:".length())); |
| return event -> event.getRequestType() != null && event.getRequestType().equals(muteType); |
| } |
| if (rule.startsWith("collection:")) { |
| return event -> event.getCollections() != null && event.getCollections().contains(rule.substring("collection:".length())); |
| } |
| if (rule.startsWith("user:")) { |
| return event -> event.getUsername() != null && event.getUsername().equals(rule.substring("user:".length())); |
| } |
| if (rule.startsWith("path:")) { |
| return event -> event.getResource() != null && event.getResource().startsWith(rule.substring("path:".length())); |
| } |
| if (rule.startsWith("ip:")) { |
| return event -> event.getClientIp() != null && event.getClientIp().equals(rule.substring("ip:".length())); |
| } |
| if (rule.startsWith("param:")) { |
| String[] kv = rule.substring("param:".length()).split("="); |
| if (kv.length == 2) { |
| return event -> event.getSolrParams() != null && kv[1].equals(event.getSolrParamAsString(kv[0])); |
| } else { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The 'param' muteRule must be of format 'param:key=value', got " + rule); |
| } |
| } |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unkonwn mute rule " + rule); |
| } catch (ClassCastException | IllegalArgumentException e) { |
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There was a problem parsing muteRules. Must be a list of valid rule strings", e); |
| } |
| } |
| |
| /** |
| * Returns true if any of the configured mute rules matches. The inner lists are ORed, while rules inside |
| * inner lists are ANDed |
| */ |
| boolean shouldMute(AuditEvent event) { |
| if (rules == null) return false; |
| return rules.stream().anyMatch(rl -> rl.stream().allMatch(r -> r.shouldMute(event))); |
| } |
| } |
| |
| public interface MuteRule { |
| boolean shouldMute(AuditEvent event); |
| } |
| } |