blob: aa66d9fafe2080ad4d457b871352c9d01facd93b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.NoSuchFileException;
import java.util.Locale;
import java.util.Map;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
/** Common base class for {@link PrimaryNode} and {@link ReplicaNode}.
*
* @lucene.experimental */
public abstract class Node implements Closeable {
public static boolean VERBOSE_FILES = true;
public static boolean VERBOSE_CONNECTIONS = false;
// Keys we store into IndexWriter's commit user data:
/** Key to store the primary gen in the commit data, which increments every time we promote a new primary, so replicas can detect when the
* primary they were talking to is changed */
public static String PRIMARY_GEN_KEY = "__primaryGen";
/** Key to store the version in the commit data, which increments every time we open a new NRT reader */
public static String VERSION_KEY = "__version";
/** Compact ordinal for this node */
protected final int id;
protected final Directory dir;
protected final SearcherFactory searcherFactory;
// Tracks NRT readers, opened from IW (primary) or opened from replicated SegmentInfos pulled across the wire (replica):
protected ReferenceManager<IndexSearcher> mgr;
/** Startup time of original test, carefully propogated to all nodes to produce consistent "seconds since start time" in messages */
public static long globalStartNS;
/** When this node was started */
public static final long localStartNS = System.nanoTime();
/** For debug logging */
protected final PrintStream printStream;
// public static final long globalStartNS;
// For debugging:
volatile String state = "idle";
/** File metadata for last sync that succeeded; we use this as a cache */
protected volatile Map<String,FileMetaData> lastFileMetaData;
public Node(int id, Directory dir, SearcherFactory searcherFactory, PrintStream printStream) {
this.id = id;
this.dir = dir;
this.searcherFactory = searcherFactory;
this.printStream = printStream;
}
/** Returns the {@link ReferenceManager} to use for acquiring and releasing searchers */
public ReferenceManager<IndexSearcher> getSearcherManager() {
return mgr;
}
/** Returns the {@link Directory} this node is writing to */
public Directory getDirectory() {
return dir;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(id=" + id + ")";
}
public abstract void commit() throws IOException;
public static void nodeMessage(PrintStream printStream, String message) {
if (printStream != null) {
long now = System.nanoTime();
printStream.println(String.format(Locale.ROOT,
"%5.3fs %5.1fs: [%11s] %s",
(now-globalStartNS)/1000000000.,
(now-localStartNS)/1000000000.,
Thread.currentThread().getName(),
message));
}
}
public static void nodeMessage(PrintStream printStream, int id, String message) {
if (printStream != null) {
long now = System.nanoTime();
printStream.println(String.format(Locale.ROOT,
"%5.3fs %5.1fs: N%d [%11s] %s",
(now-globalStartNS)/1000000000.,
(now-localStartNS)/1000000000.,
id,
Thread.currentThread().getName(),
message));
}
}
public void message(String message) {
if (printStream != null) {
long now = System.nanoTime();
printStream.println(String.format(Locale.ROOT,
"%5.3fs %5.1fs: %7s %2s [%11s] %s",
(now-globalStartNS)/1000000000.,
(now-localStartNS)/1000000000.,
state, name(),
Thread.currentThread().getName(), message));
}
}
public String name() {
char mode = this instanceof PrimaryNode ? 'P' : 'R';
return mode + Integer.toString(id);
}
public abstract boolean isClosed();
public long getCurrentSearchingVersion() throws IOException {
IndexSearcher searcher = mgr.acquire();
try {
return ((DirectoryReader) searcher.getIndexReader()).getVersion();
} finally {
mgr.release(searcher);
}
}
public static String bytesToString(long bytes) {
if (bytes < 1024) {
return bytes + " b";
} else if (bytes < 1024 * 1024) {
return String.format(Locale.ROOT, "%.1f KB", bytes/1024.);
} else if (bytes < 1024 * 1024 * 1024) {
return String.format(Locale.ROOT, "%.1f MB", bytes/1024./1024.);
} else {
return String.format(Locale.ROOT, "%.1f GB", bytes/1024./1024./1024.);
}
}
/** Opens the specified file, reads its identifying information, including file length, full index header (includes the unique segment
* ID) and the full footer (includes checksum), and returns the resulting {@link FileMetaData}.
*
* <p>This returns null, logging a message, if there are any problems (the file does not exist, is corrupt, truncated, etc.).</p> */
public FileMetaData readLocalFileMetaData(String fileName) throws IOException {
Map<String,FileMetaData> cache = lastFileMetaData;
FileMetaData result;
if (cache != null) {
// We may already have this file cached from the last NRT point:
result = cache.get(fileName);
} else {
result = null;
}
if (result == null) {
// Pull from the filesystem
long checksum;
long length;
byte[] header;
byte[] footer;
try (IndexInput in = dir.openInput(fileName, IOContext.DEFAULT)) {
try {
length = in.length();
header = CodecUtil.readIndexHeader(in);
footer = CodecUtil.readFooter(in);
checksum = CodecUtil.retrieveChecksum(in);
} catch (EOFException | CorruptIndexException cie) {
// File exists but is busted: we must copy it. This happens when node had crashed, corrupting an un-fsync'd file. On init we try
// to delete such unreferenced files, but virus checker can block that, leaving this bad file.
if (VERBOSE_FILES) {
message("file " + fileName + ": will copy [existing file is corrupt]");
}
return null;
}
if (VERBOSE_FILES) {
message("file " + fileName + " has length=" + bytesToString(length));
}
} catch (FileNotFoundException | NoSuchFileException e) {
if (VERBOSE_FILES) {
message("file " + fileName + ": will copy [file does not exist]");
}
return null;
}
// NOTE: checksum is redundant w/ footer, but we break it out separately because when the bits cross the wire we need direct access to
// checksum when copying to catch bit flips:
result = new FileMetaData(header, footer, length, checksum);
}
return result;
}
}