blob: 8de165c6d137fb20d366a67e8e4cfce82e56ba30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.InfoStream;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.schema.IndexSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An IndexWriter that is configured via Solr config mechanisms.
*
* @since solr 0.9
*/
public class SolrIndexWriter extends IndexWriter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// These should *only* be used for debugging or monitoring purposes
public static final AtomicLong numOpens = new AtomicLong();
public static final AtomicLong numCloses = new AtomicLong();
/** Stored into each Lucene commit to record the
* System.currentTimeMillis() when commit was called. */
public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec";
public static final String COMMIT_COMMAND_VERSION = "commitCommandVer";
private volatile String name;
private final DirectoryFactory directoryFactory;
private final InfoStream infoStream;
private volatile Directory directory;
// metrics
private volatile long majorMergeDocs = 512 * 1024;
private volatile Timer majorMerge;
private volatile Timer minorMerge;
private volatile Meter majorMergedDocs;
private volatile Meter majorDeletedDocs;
private volatile Counter mergeErrors;
private volatile Meter flushMeter; // original counter is package-private in IndexWriter
private volatile boolean mergeTotals = false;
private volatile boolean mergeDetails = false;
private final AtomicInteger runningMajorMerges = new AtomicInteger();
private final AtomicInteger runningMinorMerges = new AtomicInteger();
private final AtomicInteger runningMajorMergesSegments = new AtomicInteger();
private final AtomicInteger runningMinorMergesSegments = new AtomicInteger();
private final AtomicLong runningMajorMergesDocs = new AtomicLong();
private final AtomicLong runningMinorMergesDocs = new AtomicLong();
private final SolrMetricsContext solrMetricsContext;
// merge diagnostics.
private final Map<String, Long> runningMerges = new ConcurrentHashMap<>();
private final boolean releaseDirectory;
//
// public static SolrIndexWriter create(SolrCore core, String name, String path, DirectoryFactory directoryFactory,
// boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec)
// throws IOException {
// if (log.isDebugEnabled()) {
// log.debug("create(SolrCore core={}, String name={}, String path={}, DirectoryFactory directoryFactory={}, boolean create={}, IndexSchema schema={}, SolrIndexConfig config={}, IndexDeletionPolicy delPolicy={}, Codec codec={}) - start",
// core, name, path, directoryFactory, create, schema, config, delPolicy, codec);
// }
//
// SolrIndexWriter w = null;
//
// w = new SolrIndexWriter(core, name, path, directoryFactory, create, schema, config, delPolicy, codec);
//
// if (log.isDebugEnabled()) {
// log.debug(
// "create(SolrCore, String, String, DirectoryFactory, boolean, IndexSchema, SolrIndexConfig, IndexDeletionPolicy, Codec) - end");
// }
// return w;
// }
public static SolrIndexWriter buildIndexWriter(SolrCore core, String name, String path, DirectoryFactory directoryFactory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec, boolean commitOnClose) {
SolrIndexWriter iw = null;
Directory dir = null;
try {
dir = getDir(directoryFactory, path, config);
iw = new SolrIndexWriter(core, name, directoryFactory, dir, create, schema, config, delPolicy, codec, commitOnClose);
} catch (Throwable e) {
ParWork.propagateInterrupt(e);
SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
if (iw != null) {
try {
iw.close();
} catch (Exception e1) {
if (dir != null) {
try {
directoryFactory.release(dir);
} catch (IOException e2) {
exp.addSuppressed(e2);
}
}
exp.addSuppressed(e1);
}
} else {
if (dir != null) {
try {
directoryFactory.release(dir);
} catch (IOException e1) {
exp.addSuppressed(e1);
}
}
}
if (e instanceof Error) {
log.error("Exception constructing SolrIndexWriter", exp);
throw (Error) e;
}
throw exp;
}
return iw;
}
public SolrIndexWriter(String name, Directory d, IndexWriterConfig conf) throws IOException {
super(d, conf);
this.name = name;
this.infoStream = conf.getInfoStream();
this.directory = d;
this.directoryFactory = null;
numOpens.incrementAndGet();
if (log.isDebugEnabled()) log.debug("Opened Writer " + name);
// no metrics
mergeTotals = false;
mergeDetails = false;
solrMetricsContext = null;
this.releaseDirectory=false;
assert ObjectReleaseTracker.track(this);
}
public SolrIndexWriter(SolrCore core, String name, DirectoryFactory directoryFactory, Directory directory, boolean create, IndexSchema schema, SolrIndexConfig config, IndexDeletionPolicy delPolicy, Codec codec, boolean commitOnClose) throws IOException {
super(directory,
config.toIndexWriterConfig(core).
setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND).
setIndexDeletionPolicy(delPolicy).setCodec(codec).setCommitOnClose(commitOnClose)
);
try {
if (log.isDebugEnabled()) log.debug("Opened Writer " + name);
this.releaseDirectory = true;
this.directory = getDirectory();
this.directoryFactory = directoryFactory;
this.name = name;
infoStream = getConfig().getInfoStream();
numOpens.incrementAndGet();
solrMetricsContext = core.getSolrMetricsContext().getChildContext(this);
if (config.metricsInfo != null && config.metricsInfo.initArgs != null) {
Object v = config.metricsInfo.initArgs.get("majorMergeDocs");
if (v != null) {
try {
majorMergeDocs = Long.parseLong(String.valueOf(v));
} catch (Exception e) {
log.warn("Invalid 'majorMergeDocs' argument, using default 512k", e);
}
}
Boolean Totals = config.metricsInfo.initArgs.getBooleanArg("merge");
Boolean Details = config.metricsInfo.initArgs.getBooleanArg("mergeDetails");
if (Details != null) {
mergeDetails = Details;
} else {
mergeDetails = false;
}
if (Totals != null) {
mergeTotals = Totals;
} else {
mergeTotals = false;
}
if (mergeDetails) {
mergeTotals = true; // override
majorMergedDocs = solrMetricsContext.meter("docs", SolrInfoBean.Category.INDEX.toString(), "merge", "major");
majorDeletedDocs = solrMetricsContext.meter("deletedDocs", SolrInfoBean.Category.INDEX.toString(), "merge", "major");
}
if (mergeTotals) {
minorMerge = solrMetricsContext.timer("minor", SolrInfoBean.Category.INDEX.toString(), "merge");
majorMerge = solrMetricsContext.timer("major", SolrInfoBean.Category.INDEX.toString(), "merge");
mergeErrors = solrMetricsContext.counter("errors", SolrInfoBean.Category.INDEX.toString(), "merge");
String tag = core.getMetricTag();
solrMetricsContext.gauge(() -> runningMajorMerges.get(), true, "running", SolrInfoBean.Category.INDEX.toString(), "merge", "major");
solrMetricsContext.gauge(() -> runningMinorMerges.get(), true, "running", SolrInfoBean.Category.INDEX.toString(), "merge", "minor");
solrMetricsContext.gauge(() -> runningMajorMergesDocs.get(), true, "running.docs", SolrInfoBean.Category.INDEX.toString(), "merge", "major");
solrMetricsContext.gauge(() -> runningMinorMergesDocs.get(), true, "running.docs", SolrInfoBean.Category.INDEX.toString(), "merge", "minor");
solrMetricsContext.gauge(() -> runningMajorMergesSegments.get(), true, "running.segments", SolrInfoBean.Category.INDEX.toString(), "merge", "major");
solrMetricsContext.gauge(() -> runningMinorMergesSegments.get(), true, "running.segments", SolrInfoBean.Category.INDEX.toString(), "merge", "minor");
flushMeter = solrMetricsContext.meter("flush", SolrInfoBean.Category.INDEX.toString());
}
}
} catch (Exception e) {
log.error("Exception creating " + this.getClass().getName(), e);
directoryFactory.release(getDirectory());
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating IndexWriter");
}
// TODO:
// assert ObjectReleaseTracker.track(this);
}
public static Directory getDir(DirectoryFactory directoryFactory, String path, SolrIndexConfig config) {
Directory dir = null;
try {
dir = directoryFactory.get(path, DirContext.DEFAULT, config.lockType);
} catch (Exception e) {
ParWork.propagateInterrupt(e);
SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
if (dir != null) try {
directoryFactory.release(dir);
} catch (IOException e1) {
exp.addSuppressed(e1);
}
throw exp;
}
return dir;
}
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
public static void setCommitData(IndexWriter iw, long commitCommandVersion) {
if (log.isDebugEnabled()) log.debug("Calling setCommitData with IW:" + iw.toString() + " commitCommandVersion:"+commitCommandVersion);
final Map<String,String> commitData = new HashMap<>();
commitData.put(COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
commitData.put(COMMIT_COMMAND_VERSION, String.valueOf(commitCommandVersion));
iw.setLiveCommitData(commitData.entrySet());
}
// we override this method to collect metrics for merges.
@Override
public void merge(MergePolicy.OneMerge merge) throws IOException {
if (log.isDebugEnabled()) {
log.debug("merge(MergePolicy.OneMerge merge={}) - start", merge);
}
String segString = merge.segString();
long totalNumDocs = merge.totalNumDocs();
runningMerges.put(segString, totalNumDocs);
if (!mergeTotals) {
try {
super.merge(merge);
} finally {
runningMerges.remove(segString);
}
if (log.isDebugEnabled()) {
log.debug("merge(MergePolicy.OneMerge) - end");
}
return;
}
long deletedDocs = 0;
for (SegmentCommitInfo info : merge.segments) {
totalNumDocs -= info.getDelCount();
deletedDocs += info.getDelCount();
}
boolean major = totalNumDocs > majorMergeDocs;
int segmentsCount = merge.segments.size();
Timer.Context context;
if (major) {
runningMajorMerges.incrementAndGet();
runningMajorMergesDocs.addAndGet(totalNumDocs);
runningMajorMergesSegments.addAndGet(segmentsCount);
if (mergeDetails) {
majorMergedDocs.mark(totalNumDocs);
majorDeletedDocs.mark(deletedDocs);
}
context = majorMerge.time();
} else {
runningMinorMerges.incrementAndGet();
runningMinorMergesDocs.addAndGet(totalNumDocs);
runningMinorMergesSegments.addAndGet(segmentsCount);
context = minorMerge.time();
}
try {
super.merge(merge);
} catch (Throwable t) {
mergeErrors.inc();
throw t;
} finally {
runningMerges.remove(segString);
context.stop();
if (major) {
runningMajorMerges.decrementAndGet();
runningMajorMergesDocs.addAndGet(-totalNumDocs);
runningMajorMergesSegments.addAndGet(-segmentsCount);
} else {
runningMinorMerges.decrementAndGet();
runningMinorMergesDocs.addAndGet(-totalNumDocs);
runningMinorMergesSegments.addAndGet(-segmentsCount);
}
}
if (log.isDebugEnabled()) {
log.debug("merge(MergePolicy.OneMerge) - end");
}
}
public Map<String, Object> getRunningMerges() {
if (log.isDebugEnabled()) {
log.debug("getRunningMerges() - start");
}
Map<String,Object> returnMap = Collections.unmodifiableMap(runningMerges);
if (log.isDebugEnabled()) {
log.debug("getRunningMerges() - end");
}
return returnMap;
}
@Override
protected void doAfterFlush() throws IOException {
if (log.isTraceEnabled()) log.trace("doAfterFlush() - start");
if (flushMeter != null) { // this is null when writer is used only for snapshot cleanup
flushMeter.mark(); // or if mergeTotals == false
}
super.doAfterFlush();
}
@Override
public void close() throws IOException {
if (log.isDebugEnabled()) log.debug("Closing Writer " + name);
try {
super.close();
} catch (Throwable e) {
ParWork.propagateInterrupt("Error closing IndexWriter", e);
} finally {
cleanup("close");
}
if (log.isDebugEnabled()) {
log.debug("close() - end");
}
}
@Override
public void rollback() throws IOException {
if (log.isDebugEnabled()) log.debug("Rollback Writer " + name);
try {
super.rollback();
} catch (Throwable e) {
ParWork.propagateInterrupt("Exception rolling back IndexWriter", e);
} finally {
cleanup("rollback");
}
if (log.isDebugEnabled()) {
log.debug("rollback() - end");
}
}
private void cleanup(String label) throws IOException {
numCloses.incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("SolrIndexWriter close {} numCloses={}", label, numCloses.get());
}
try {
if (infoStream != null) {
IOUtils.closeQuietly(infoStream);
}
if (releaseDirectory) {
if (log.isDebugEnabled()) log.debug("SolrIndexWriter release {}", directory);
directoryFactory.release(directory);
}
if (solrMetricsContext != null) {
solrMetricsContext.unregister();
}
} finally {
assert ObjectReleaseTracker.release(this);
}
if (log.isDebugEnabled()) {
log.debug("cleanup() - end");
}
}
}