blob: d9a5d3957e309aed1f9fb5932dd61136d9118ae0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.store.RAMOutputStream;
import org.apache.lucene.util.ThreadInterruptedException;
/*
* This just asks IndexWriter to open new NRT reader, in order to publish a new NRT point. This could be improved, if we separated out 1)
* nrt flush (and incRef the SIS) from 2) opening a new reader, but this is tricky with IW's concurrency, and it would also be hard-ish to share
* IW's reader pool with our searcher manager. So we do the simpler solution now, but that adds some unecessary latency to NRT refresh on
* replicas since step 2) could otherwise be done concurrently with replicas copying files over.
*/
/** Node that holds an IndexWriter, indexing documents into its local index.
*
* @lucene.experimental */
public abstract class PrimaryNode extends Node {
// Current NRT segment infos, incRef'd with IndexWriter.deleter:
private SegmentInfos curInfos;
protected final IndexWriter writer;
// IncRef'd state of the last published NRT point; when a replica comes asking, we give it this as the current NRT point:
private CopyState copyState;
protected final long primaryGen;
/** Contains merged segments that have been copied to all running replicas (as of when that merge started warming). */
final Set<String> finishedMergedFiles = Collections.synchronizedSet(new HashSet<String>());
private final AtomicInteger copyingCount = new AtomicInteger();
public PrimaryNode(IndexWriter writer, int id, long primaryGen, long forcePrimaryVersion,
SearcherFactory searcherFactory, PrintStream printStream) throws IOException {
super(id, writer.getDirectory(), searcherFactory, printStream);
message("top: now init primary");
this.writer = writer;
this.primaryGen = primaryGen;
try {
// So that when primary node's IndexWriter finishes a merge, but before it cuts over to the merged segment,
// it copies it out to the replicas. This ensures the whole system's NRT latency remains low even when a
// large merge completes:
writer.getConfig().setMergedSegmentWarmer(new PreCopyMergedSegmentWarmer(this));
message("IWC:\n" + writer.getConfig());
message("dir:\n" + writer.getDirectory());
message("commitData: " + writer.getLiveCommitData());
// Record our primaryGen in the userData, and set initial version to 0:
Map<String,String> commitData = new HashMap<>();
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
if (iter != null) {
for(Map.Entry<String,String> ent : iter) {
commitData.put(ent.getKey(), ent.getValue());
}
}
commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
if (commitData.get(VERSION_KEY) == null) {
commitData.put(VERSION_KEY, "0");
message("add initial commitData version=0");
} else {
message("keep current commitData version=" + commitData.get(VERSION_KEY));
}
writer.setLiveCommitData(commitData.entrySet(), false);
// We forcefully advance the SIS version to an unused future version. This is necessary if the previous primary crashed and we are
// starting up on an "older" index, else versions can be illegally reused but show different results:
if (forcePrimaryVersion != -1) {
message("now forcePrimaryVersion to version=" + forcePrimaryVersion);
writer.advanceSegmentInfosVersion(forcePrimaryVersion);
}
mgr = new SearcherManager(writer, true, true, searcherFactory);
setCurrentInfos(Collections.<String>emptySet());
message("init: infos version=" + curInfos.getVersion());
} catch (Throwable t) {
message("init: exception");
t.printStackTrace(printStream);
throw new RuntimeException(t);
}
}
/** Returns the current primary generation, which is incremented each time a new primary is started for this index */
public long getPrimaryGen() {
return primaryGen;
}
// TODO: in the future, we should separate "flush" (returns an incRef'd SegmentInfos) from "refresh" (open new NRT reader from
// IndexWriter) so that the latter can be done concurrently while copying files out to replicas, minimizing the refresh time from the
// replicas. But fixing this is tricky because e.g. IndexWriter may complete a big merge just after returning the incRef'd SegmentInfos
// and before we can open a new reader causing us to close the just-merged readers only to then open them again from the (now stale)
// SegmentInfos. To fix this "properly" I think IW.inc/decRefDeleter must also incread the ReaderPool entry
/** Flush all index operations to disk and opens a new near-real-time reader.
* new NRT point, to make the changes visible to searching. Returns true if there were changes. */
public boolean flushAndRefresh() throws IOException {
message("top: now flushAndRefresh");
Set<String> completedMergeFiles;
synchronized(finishedMergedFiles) {
completedMergeFiles = Collections.unmodifiableSet(new HashSet<>(finishedMergedFiles));
}
mgr.maybeRefreshBlocking();
boolean result = setCurrentInfos(completedMergeFiles);
if (result) {
message("top: opened NRT reader version=" + curInfos.getVersion());
finishedMergedFiles.removeAll(completedMergeFiles);
message("flushAndRefresh: version=" + curInfos.getVersion() + " completedMergeFiles=" + completedMergeFiles + " finishedMergedFiles=" + finishedMergedFiles);
} else {
message("top: no changes in flushAndRefresh; still version=" + curInfos.getVersion());
}
return result;
}
public long getCopyStateVersion() {
return copyState.version;
}
public synchronized long getLastCommitVersion() {
Iterable<Map.Entry<String,String>> iter = writer.getLiveCommitData();
assert iter != null;
for(Map.Entry<String,String> ent : iter) {
if (ent.getKey().equals(VERSION_KEY)) {
return Long.parseLong(ent.getValue());
}
}
// In ctor we always install an initial version:
throw new AssertionError("missing VERSION_KEY");
}
@Override
public void commit() throws IOException {
Map<String,String> commitData = new HashMap<>();
commitData.put(PRIMARY_GEN_KEY, Long.toString(primaryGen));
// TODO (opto): it's a bit wasteful that we put "last refresh" version here, not the actual version we are committing, because it means
// on xlog replay we are replaying more ops than necessary.
commitData.put(VERSION_KEY, Long.toString(copyState.version));
message("top: commit commitData=" + commitData);
writer.setLiveCommitData(commitData.entrySet(), false);
writer.commit();
}
/** IncRef the current CopyState and return it */
public synchronized CopyState getCopyState() throws IOException {
ensureOpen(false);
//message("top: getCopyState replicaID=" + replicaID + " replicaNodeID=" + replicaNodeID + " version=" + curInfos.getVersion() + " infos=" + curInfos.toString());
assert curInfos == copyState.infos;
writer.incRefDeleter(copyState.infos);
int count = copyingCount.incrementAndGet();
assert count > 0;
return copyState;
}
/** Called once replica is done (or failed) copying an NRT point */
public void releaseCopyState(CopyState copyState) throws IOException {
//message("top: releaseCopyState version=" + copyState.version);
assert copyState.infos != null;
writer.decRefDeleter(copyState.infos);
int count = copyingCount.decrementAndGet();
assert count >= 0;
}
@Override
public boolean isClosed() {
return isClosed(false);
}
boolean isClosed(boolean allowClosing) {
return "closed".equals(state) || (allowClosing == false && "closing".equals(state));
}
private void ensureOpen(boolean allowClosing) {
if (isClosed(allowClosing)) {
throw new AlreadyClosedException(state);
}
}
/** Steals incoming infos refCount; returns true if there were changes. */
private synchronized boolean setCurrentInfos(Set<String> completedMergeFiles) throws IOException {
IndexSearcher searcher = null;
SegmentInfos infos;
try {
searcher = mgr.acquire();
infos = ((StandardDirectoryReader) searcher.getIndexReader()).getSegmentInfos();
} finally {
if (searcher != null) {
mgr.release(searcher);
}
}
if (curInfos != null && infos.getVersion() == curInfos.getVersion()) {
// no change
message("top: skip switch to infos: version=" + infos.getVersion() + " is unchanged: " + infos.toString());
return false;
}
SegmentInfos oldInfos = curInfos;
writer.incRefDeleter(infos);
curInfos = infos;
if (oldInfos != null) {
writer.decRefDeleter(oldInfos);
}
message("top: switch to infos=" + infos.toString() + " version=" + infos.getVersion());
// Serialize the SegmentInfos:
RAMOutputStream out = new RAMOutputStream(new RAMFile(), true);
infos.write(out);
byte[] infosBytes = new byte[(int) out.getFilePointer()];
out.writeTo(infosBytes, 0);
Map<String,FileMetaData> filesMetaData = new HashMap<String,FileMetaData>();
for(SegmentCommitInfo info : infos) {
for(String fileName : info.files()) {
FileMetaData metaData = readLocalFileMetaData(fileName);
// NOTE: we hold a refCount on this infos, so this file better exist:
assert metaData != null: "file \"" + fileName + "\" is missing metadata";
assert filesMetaData.containsKey(fileName) == false;
filesMetaData.put(fileName, metaData);
}
}
lastFileMetaData = Collections.unmodifiableMap(filesMetaData);
message("top: set copyState primaryGen=" + primaryGen + " version=" + infos.getVersion() + " files=" + filesMetaData.keySet());
copyState = new CopyState(lastFileMetaData,
infos.getVersion(), infos.getGeneration(), infosBytes, completedMergeFiles,
primaryGen, curInfos);
return true;
}
private synchronized void waitForAllRemotesToClose() throws IOException {
// Wait for replicas to finish or crash:
while (true) {
int count = copyingCount.get();
if (count == 0) {
return;
}
message("pendingCopies: " + count);
try {
wait(10);
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
}
@Override
public void close() throws IOException {
state = "closing";
message("top: close primary");
synchronized (this) {
waitForAllRemotesToClose();
if (curInfos != null) {
writer.decRefDeleter(curInfos);
curInfos = null;
}
}
mgr.close();
writer.rollback();
dir.close();
state = "closed";
}
/** Called when a merge has finished, but before IW switches to the merged segment */
protected abstract void preCopyMergedSegmentFiles(SegmentCommitInfo info, Map<String,FileMetaData> files) throws IOException;
}