| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.solr.update.processor; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.text.ParseException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.concurrent.RejectedExecutionHandler; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.solr.cloud.CloudDescriptor; |
| import org.apache.solr.cloud.ZkController; |
| import org.apache.solr.common.SolrException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.apache.solr.common.cloud.DocCollection; |
| import org.apache.solr.common.cloud.Replica; |
| import org.apache.solr.common.cloud.Slice; |
| import org.apache.solr.common.util.ExecutorUtil; |
| import org.apache.solr.common.util.NamedList; |
| import org.apache.solr.common.util.SimpleOrderedMap; |
| import org.apache.solr.core.CloseHook; |
| import org.apache.solr.core.SolrCore; |
| import org.apache.solr.request.LocalSolrQueryRequest; |
| import org.apache.solr.request.SolrQueryRequest; |
| import org.apache.solr.request.SolrRequestInfo; |
| import org.apache.solr.response.SolrQueryResponse; |
| import org.apache.solr.security.PKIAuthenticationPlugin; |
| import org.apache.solr.update.AddUpdateCommand; |
| import org.apache.solr.update.CommitUpdateCommand; |
| import org.apache.solr.update.DeleteUpdateCommand; |
| import org.apache.solr.util.DateMathParser; |
| import org.apache.solr.common.util.SolrNamedThreadFactory; |
| import org.apache.solr.util.plugin.SolrCoreAware; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST; |
| import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR; |
| |
| /** |
| * <p> |
| * Update Processor Factory for managing automatic "expiration" of documents. |
| * </p> |
| * |
| * <p> |
| * The <code>DocExpirationUpdateProcessorFactory</code> provides two features related |
| * to the "expiration" of documents which can be used individually, or in combination: |
| * </p> |
| * <ol> |
| * <li>Computing expiration field values for documents from a "time to live" (TTL)</li> |
| * <li>Periodically delete documents from the index based on an expiration field</li> |
| * </ol> |
| * |
| * <p> |
| * Documents with expiration field values computed from a TTL can be be excluded from |
| * searchers using simple date based filters relative to <code>NOW</code>, or completely |
| * removed from the index using the periodic delete function of this factory. Alternatively, |
| * the periodic delete function of this factory can be used to remove any document with an |
| * expiration value - even if that expiration was explicitly set with-out leveraging the TTL |
| * feature of this factory. |
| * </p> |
| * |
| * <p> |
| * The following configuration options are supported: |
| * </p> |
| * |
| * <ul> |
| * <li><code>expirationFieldName</code> - The name of the expiration field to use |
| * in any operations (mandatory). |
| * </li> |
| * <li><code>ttlFieldName</code> - Name of a field this process should look |
| * for in each document processed, defaulting to <code>_ttl_</code>. |
| * If the specified field name exists in a document, the document field value |
| * will be parsed as a {@linkplain DateMathParser Date Math Expression} relative to |
| * <code>NOW</code> and the result will be added to the document using the |
| * <code>expirationFieldName</code>. Use <code><null name="ttlFieldName"/></code> |
| * to disable this feature. |
| * </li> |
| * <li><code>ttlParamName</code> - Name of an update request param this process should |
| * look for in each request when processing document additions, defaulting to |
| * <code>_ttl_</code>. If the the specified param name exists in an update request, |
| * the param value will be parsed as a {@linkplain DateMathParser Date Math Expression} |
| * relative to <code>NOW</code> and the result will be used as a default for any |
| * document included in that request that does not already have a value in the |
| * field specified by <code>ttlFieldName</code>. Use |
| * <code><null name="ttlParamName"/></code> to disable this feature. |
| * </li> |
| * <li><code>autoDeletePeriodSeconds</code> - Optional numeric value indicating how |
| * often this factory should trigger a delete to remove documents. If this option is |
| * used, and specifies a non-negative numeric value, a background thread will be |
| * created that will execute recurring <code>deleteByQuery</code> commands using the |
| * specified period. The delete query will remove all documents with an |
| * <code>expirationFieldName</code> up to <code>NOW</code>. |
| * </li> |
| * <li><code>autoDeleteChainName</code> - Optional name of an |
| * <code>updateRequestProcessorChain</code> to use when executing automatic deletes. |
| * If not specified, or <code><null/></code>, the default |
| * <code>updateRequestProcessorChain</code> for this collection is used. |
| * This option is ignored unless <code>autoDeletePeriodSeconds</code> is configured |
| * and is non-negative. |
| * </li> |
| * </ul> |
| * |
| * <p> |
| * For example: The configuration below will cause any document with a field named |
| * <code>_ttl_</code> to have a Date field named <code>_expire_at_</code> computed |
| * for it when added -- but no automatic deletion will happen. |
| * </p> |
| * |
| * <pre class="prettyprint"> |
| * <processor class="solr.processor.DocExpirationUpdateProcessorFactory"> |
| * <str name="expirationFieldName">_expire_at_</str> |
| * </processor></pre> |
| * |
| * <p> |
| * Alternatively, in this configuration deletes will occur automatically against the |
| * <code>_expire_at_</code> field every 5 minutes - but this processor will not |
| * automatically populate the <code>_expire_at_</code> using any sort of TTL expression. |
| * Only documents that were added with an explicit <code>_expire_at_</code> field value |
| * will ever be deleted. |
| * </p> |
| * |
| * <pre class="prettyprint"> |
| * <processor class="solr.processor.DocExpirationUpdateProcessorFactory"> |
| * <null name="ttlFieldName"/> |
| * <null name="ttlParamName"/> |
| * <int name="autoDeletePeriodSeconds">300</int> |
| * <str name="expirationFieldName">_expire_at_</str> |
| * </processor></pre> |
| * |
| * <p> |
| * This last example shows the combination of both features using a custom |
| * <code>ttlFieldName</code>: Documents with a <code>my_ttl</code> field will |
| * have an <code>_expire_at_</code> field computed, and deletes will be triggered |
| * every 5 minutes to remove documents whose |
| * <code>_expire_at_</code> field value is in the past. |
| * </p> |
| * |
| * <pre class="prettyprint"> |
| * <processor class="solr.processor.DocExpirationUpdateProcessorFactory"> |
| * <int name="autoDeletePeriodSeconds">300</int> |
| * <str name="ttlFieldName">my_ttl</str> |
| * <null name="ttlParamName"/> |
| * <str name="expirationFieldName">_expire_at_</str> |
| * </processor></pre> |
| * @since 4.8.0 |
| */ |
| public final class DocExpirationUpdateProcessorFactory |
| extends UpdateRequestProcessorFactory |
| implements SolrCoreAware { |
| |
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private static final String DEF_TTL_KEY = "_ttl_"; |
| private static final String EXP_FIELD_NAME_CONF = "expirationFieldName"; |
| private static final String TTL_FIELD_NAME_CONF = "ttlFieldName"; |
| private static final String TTL_PARAM_NAME_CONF = "ttlParamName"; |
| private static final String DEL_CHAIN_NAME_CONF = "autoDeleteChainName"; |
| private static final String DEL_PERIOD_SEC_CONF = "autoDeletePeriodSeconds"; |
| |
| private SolrCore core; |
| private ScheduledThreadPoolExecutor executor; |
| |
| private String expireField = null; |
| private String ttlField = null; |
| private String ttlParam = null; |
| |
| private String deleteChainName = null; |
| private long deletePeriodSeconds = -1L; |
| |
| private SolrException confErr(final String msg) { |
| return confErr(msg, null); |
| } |
| private SolrException confErr(final String msg, SolrException root) { |
| return new SolrException(SERVER_ERROR, this.getClass().getSimpleName()+": "+msg, root); |
| } |
| private String removeArgStr(@SuppressWarnings({"rawtypes"})final NamedList args, |
| final String arg, final String def, |
| final String errMsg) { |
| |
| if (args.indexOf(arg,0) < 0) return def; |
| |
| Object tmp = args.remove(arg); |
| if (null == tmp) return null; |
| |
| if (tmp instanceof String) return tmp.toString(); |
| |
| throw confErr(arg + " " + errMsg); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void init(@SuppressWarnings({"rawtypes"})NamedList args) { |
| |
| deleteChainName = removeArgStr(args, DEL_CHAIN_NAME_CONF, null, |
| "must be a <str> or <null/> for default chain"); |
| |
| ttlField = removeArgStr(args, TTL_FIELD_NAME_CONF, DEF_TTL_KEY, |
| "must be a <str> or <null/> to disable"); |
| ttlParam = removeArgStr(args, TTL_PARAM_NAME_CONF, DEF_TTL_KEY, |
| "must be a <str> or <null/> to disable"); |
| |
| expireField = removeArgStr(args, EXP_FIELD_NAME_CONF, null, "must be a <str>"); |
| if (null == expireField) { |
| throw confErr(EXP_FIELD_NAME_CONF + " must be configured"); |
| } |
| |
| Object tmp = args.remove(DEL_PERIOD_SEC_CONF); |
| if (null != tmp) { |
| if (! (tmp instanceof Number)) { |
| throw confErr(DEL_PERIOD_SEC_CONF + " must be an <int> or <long>"); |
| } |
| deletePeriodSeconds = ((Number)tmp).longValue(); |
| } |
| |
| super.init(args); |
| } |
| |
| @Override |
| public void inform(SolrCore core) { |
| this.core = core; |
| |
| if (null == core.getLatestSchema().getFieldTypeNoEx(expireField)) { |
| // TODO: check for managed schema and auto-add as a date field? |
| throw confErr(EXP_FIELD_NAME_CONF + " does not exist in schema: " + expireField); |
| } |
| |
| if (0 < deletePeriodSeconds) { |
| // validate that we have a chain we can work with |
| try { |
| Object ignored = core.getUpdateProcessingChain(deleteChainName); |
| } catch (SolrException e) { |
| throw confErr(DEL_CHAIN_NAME_CONF + " does not exist: " + deleteChainName, e); |
| } |
| // schedule recurring deletion |
| initDeleteExpiredDocsScheduler(core); |
| } |
| } |
| |
| private void initDeleteExpiredDocsScheduler(SolrCore core) { |
| executor = new ScheduledThreadPoolExecutor |
| (1, new SolrNamedThreadFactory("autoExpireDocs"), |
| new RejectedExecutionHandler() { |
| public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { |
| log.warn("Skipping execution of '{}' using '{}'", r, e); |
| } |
| }); |
| |
| core.addCloseHook(new CloseHook() { |
| public void postClose(SolrCore core) { |
| // update handler is gone, terminate anything that's left. |
| |
| if (executor.isTerminating()) { |
| log.info("Waiting for close of DocExpiration Executor"); |
| ExecutorUtil.shutdownAndAwaitTermination(executor); |
| } |
| } |
| public void preClose(SolrCore core) { |
| log.info("Triggering Graceful close of DocExpiration Executor"); |
| executor.shutdown(); |
| } |
| }); |
| |
| executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); |
| executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); |
| // we don't want this firing right away, since the core may not be ready |
| final long initialDelay = deletePeriodSeconds; |
| // TODO: should we make initialDelay configurable |
| // TODO: should we make initialDelay some fraction of the period? |
| executor.scheduleAtFixedRate(new DeleteExpiredDocsRunnable(this), |
| deletePeriodSeconds, |
| deletePeriodSeconds, |
| TimeUnit.SECONDS); |
| |
| } |
| |
| @Override |
| public UpdateRequestProcessor getInstance(SolrQueryRequest req, |
| SolrQueryResponse rsp, |
| UpdateRequestProcessor next ) { |
| |
| String defaultTtl = (null == ttlParam) ? null : req.getParams().get(ttlParam); |
| |
| if (null == ttlField && null == defaultTtl) { |
| // nothing to do, shortcircut ourselves out of the chain. |
| return next; |
| } else { |
| return new TTLUpdateProcessor(defaultTtl, expireField, ttlField, next); |
| } |
| } |
| |
| private static final class TTLUpdateProcessor extends UpdateRequestProcessor { |
| |
| final String defaultTtl; |
| final String expireField; |
| final String ttlField; |
| public TTLUpdateProcessor(final String defaultTtl, |
| final String expireField, |
| final String ttlField, |
| final UpdateRequestProcessor next) { |
| super(next); |
| this.defaultTtl = defaultTtl; |
| this.expireField = expireField; |
| this.ttlField = ttlField; |
| } |
| |
| @Override |
| public void processAdd(AddUpdateCommand cmd) throws IOException { |
| final SolrInputDocument doc = cmd.getSolrInputDocument(); |
| |
| final String math = doc.containsKey(ttlField) |
| ? doc.getFieldValue(ttlField).toString() : defaultTtl; |
| |
| if (null != math) { |
| try { |
| final DateMathParser dmp = new DateMathParser(); |
| // TODO: should we try to accept things like "1DAY" as well as "+1DAY" ? |
| // How? |
| // 'startsWith("+")' is a bad idea because it would cause problems with |
| // things like "/DAY+1YEAR" |
| // Maybe catch ParseException and retry with "+" prepended? |
| doc.addField(expireField, dmp.parseMath(math)); |
| } catch (ParseException pe) { |
| throw new SolrException(BAD_REQUEST, "Can't parse ttl as date math: " + math, pe); |
| } |
| } |
| |
| super.processAdd(cmd); |
| } |
| } |
| |
| /** |
| * <p> |
| * Runnable that uses the the <code>deleteChainName</code> configured for |
| * this factory to execute a delete by query (using the configured |
| * <code>expireField</code>) followed by a soft commit to re-open searchers (if needed) |
| * </p> |
| * <p> |
| * This logic is all wrapped up in a new SolrRequestInfo context with |
| * some logging to help make it obvious this background activity is happening. |
| * </p> |
| * <p> |
| * In cloud mode, this runner only triggers deletes if |
| * {@link #iAmInChargeOfPeriodicDeletes} is true. |
| * (logging is minimal in this situation) |
| * </p> |
| * |
| * @see #iAmInChargeOfPeriodicDeletes |
| */ |
| private static final class DeleteExpiredDocsRunnable implements Runnable { |
| final DocExpirationUpdateProcessorFactory factory; |
| final SolrCore core; |
| final String deleteChainName; |
| final String expireField; |
| public DeleteExpiredDocsRunnable(final DocExpirationUpdateProcessorFactory factory) { |
| this.factory = factory; |
| this.core = factory.core; |
| this.deleteChainName = factory.deleteChainName; |
| this.expireField = factory.expireField; |
| } |
| |
| public void run() { |
| // setup the request context early so the logging (including any from |
| // shouldWeDoPeriodicDelete() ) includes the core context info |
| final LocalSolrQueryRequest req = new LocalSolrQueryRequest |
| (factory.core, Collections.<String,String[]>emptyMap()); |
| try { |
| // HACK: to indicate to PKI that this is a server initiated request for the purposes |
| // of distributed requet/credential forwarding... |
| req.setUserPrincipalName(PKIAuthenticationPlugin.NODE_IS_USER); |
| |
| final SolrQueryResponse rsp = new SolrQueryResponse(); |
| rsp.addResponseHeader(new SimpleOrderedMap<>(1)); |
| SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); |
| try { |
| |
| if (! factory.iAmInChargeOfPeriodicDeletes() ) { |
| // No-Op |
| return; |
| } |
| log.info("Beginning periodic deletion of expired docs"); |
| |
| UpdateRequestProcessorChain chain = core.getUpdateProcessingChain(deleteChainName); |
| UpdateRequestProcessor proc = chain.createProcessor(req, rsp); |
| if (null == proc) { |
| log.warn("No active processors, skipping automatic deletion of expired docs using chain: {}" |
| , deleteChainName); |
| return; |
| } |
| try { |
| DeleteUpdateCommand del = new DeleteUpdateCommand(req); |
| del.setQuery("{!cache=false}" + expireField + ":[* TO " + |
| SolrRequestInfo.getRequestInfo().getNOW().toInstant() |
| + "]"); |
| proc.processDelete(del); |
| |
| // TODO: should this be more configurable? |
| // TODO: in particular: should hard commit be optional? |
| CommitUpdateCommand commit = new CommitUpdateCommand(req, false); |
| commit.softCommit = true; |
| commit.openSearcher = true; |
| proc.processCommit(commit); |
| |
| } finally { |
| try { |
| proc.finish(); |
| } finally { |
| proc.close(); |
| } |
| } |
| |
| log.info("Finished periodic deletion of expired docs"); |
| } catch (IOException ioe) { |
| log.error("IOException in periodic deletion of expired docs: ", ioe); |
| // DO NOT RETHROW: ScheduledExecutor will suppress subsequent executions |
| } catch (RuntimeException re) { |
| log.error("Runtime error in periodic deletion of expired docs: ", re); |
| // DO NOT RETHROW: ScheduledExecutor will suppress subsequent executions |
| } finally { |
| SolrRequestInfo.clearRequestInfo(); |
| } |
| } finally { |
| req.close(); |
| } |
| } |
| } |
| |
| /** |
| * <p> |
| * Helper method that returns true if the Runnable managed by this factory |
| * should be responsible of doing periodical deletes. |
| * </p> |
| * <p> |
| * In simple standalone installations this method always returns true, |
| * but in cloud mode it will be true if and only if we are currently the leader |
| * of the (active) slice with the first name (lexicographically). |
| * </p> |
| * <p> |
| * If this method returns false, it may have also logged a message letting the user |
| * know why we aren't attempting period deletion (but it will attempt to not log |
| * this excessively) |
| * </p> |
| */ |
| private boolean iAmInChargeOfPeriodicDeletes() { |
| ZkController zk = core.getCoreContainer().getZkController(); |
| |
| if (null == zk) return true; |
| |
| // This is a lot simpler then doing our own "leader" election across all replicas |
| // of all shards since: |
| // a) we already have a per shard leader |
| // b) shard names must be unique |
| // c) ClusterState is already being "watched" by ZkController, no additional zk hits |
| // d) there might be multiple instances of this factory (in multiple chains) per |
| // collection, so picking an ephemeral node name for our election would be tricky |
| |
| CloudDescriptor desc = core.getCoreDescriptor().getCloudDescriptor(); |
| String col = desc.getCollectionName(); |
| |
| DocCollection docCollection = zk.getClusterState().getCollection(col); |
| if (docCollection.getActiveSlicesArr().length == 0) { |
| log.error("Collection {} has no active Slices?", col); |
| return false; |
| } |
| List<Slice> slices = new ArrayList<>(Arrays.asList(docCollection.getActiveSlicesArr())); |
| Collections.sort(slices, COMPARE_SLICES_BY_NAME); |
| Replica firstSliceLeader = slices.get(0).getLeader(); |
| if (null == firstSliceLeader) { |
| log.warn("Slice in charge of periodic deletes for {} does not currently have a leader", |
| col); |
| return false; |
| } |
| String leaderInCharge = firstSliceLeader.getName(); |
| String myCoreNodeName = desc.getCoreNodeName(); |
| |
| boolean inChargeOfDeletesRightNow = leaderInCharge.equals(myCoreNodeName); |
| |
| if (previouslyInChargeOfDeletes && ! inChargeOfDeletesRightNow) { |
| // don't spam the logs constantly, just log when we know that we're not the guy |
| // (the first time -- or anytime we were, but no longer are) |
| log.info("Not currently in charge of periodic deletes for this collection, {}", |
| "will not trigger delete or log again until this changes"); |
| } |
| |
| previouslyInChargeOfDeletes = inChargeOfDeletesRightNow; |
| return inChargeOfDeletesRightNow; |
| } |
| |
| /** @see #iAmInChargeOfPeriodicDeletes */ |
| private volatile boolean previouslyInChargeOfDeletes = true; |
| |
| private static final Comparator<Slice> COMPARE_SLICES_BY_NAME = (a, b) -> a.getName().compareTo(b.getName()); |
| |
| } |
| |
| |
| |