blob: daeffc79f1cf48160a6faf1456556504a1b32e90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.document.Document;
/** Parent JVM hold this "wrapper" to refer to each child JVM. This is roughly equivalent e.g. to a client-side "sugar" API. */
class NodeProcess implements Closeable {
final Process p;
// Port sub-process is listening on
final int tcpPort;
final int id;
final Thread pumper;
// Acquired when searching or indexing wants to use this node:
final ReentrantLock lock;
final boolean isPrimary;
// Version in the commit point we opened on init:
final long initCommitVersion;
// SegmentInfos.version, which can be higher than the initCommitVersion
final long initInfosVersion;
volatile boolean isOpen = true;
final AtomicBoolean nodeIsClosing;
public NodeProcess(Process p, int id, int tcpPort, Thread pumper, boolean isPrimary, long initCommitVersion, long initInfosVersion, AtomicBoolean nodeIsClosing) {
this.p = p;
this.id = id;
this.tcpPort = tcpPort;
this.pumper = pumper;
this.isPrimary = isPrimary;
this.initCommitVersion = initCommitVersion;
this.initInfosVersion = initInfosVersion;
this.nodeIsClosing = nodeIsClosing;
assert initInfosVersion >= initCommitVersion: "initInfosVersion=" + initInfosVersion + " initCommitVersion=" + initCommitVersion;
lock = new ReentrantLock();
}
@Override
public String toString() {
if (isPrimary) {
return "P" + id + " tcpPort=" + tcpPort;
} else {
return "R" + id + " tcpPort=" + tcpPort;
}
}
public synchronized void crash() {
if (isOpen) {
isOpen = false;
p.destroy();
try {
p.waitFor();
pumper.join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
public boolean commit() throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
c.flush();
c.s.shutdownOutput();
if (c.in.readByte() != 1) {
throw new RuntimeException("commit failed");
}
return true;
}
}
public void commitAsync() throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_COMMIT);
c.flush();
}
}
public long getSearchingVersion() throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_GET_SEARCHING_VERSION);
c.flush();
c.s.shutdownOutput();
return c.in.readVLong();
}
}
/** Ask the primary node process to flush. We send it all currently up replicas so it can notify them about the new NRT point. Returns the newly
* flushed version, or a negative (current) version if there were no changes. */
public synchronized long flush(int atLeastMarkerCount) throws IOException {
assert isPrimary;
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_FLUSH);
c.out.writeVInt(atLeastMarkerCount);
c.flush();
c.s.shutdownOutput();
return c.in.readLong();
}
}
@Override
public void close() {
shutdown();
}
public synchronized boolean shutdown() {
lock.lock();
try {
//System.out.println("PARENT: now shutdown node=" + id + " isOpen=" + isOpen);
if (isOpen) {
// Ask the child process to shutdown gracefully:
isOpen = false;
//System.out.println("PARENT: send CMD_CLOSE to node=" + id);
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_CLOSE);
c.flush();
if (c.in.readByte() != 1) {
throw new RuntimeException("shutdown failed");
}
} catch (Throwable t) {
System.out.println("top: shutdown failed; ignoring");
t.printStackTrace(System.out);
}
try {
p.waitFor();
pumper.join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
return true;
} finally {
lock.unlock();
}
}
public void newNRTPoint(long version, long primaryGen, int primaryTCPPort) throws IOException {
try (Connection c = new Connection(tcpPort)) {
c.out.writeByte(SimpleReplicaNode.CMD_NEW_NRT_POINT);
c.out.writeVLong(version);
c.out.writeVLong(primaryGen);
c.out.writeInt(primaryTCPPort);
c.flush();
}
}
public void addOrUpdateDocument(Connection c, Document doc, boolean isUpdate) throws IOException {
if (isPrimary == false) {
throw new IllegalStateException("only primary can index");
}
int fieldCount = 0;
String title = doc.get("title");
if (title != null) {
fieldCount++;
}
String docid = doc.get("docid");
assert docid != null;
fieldCount++;
String body = doc.get("body");
if (body != null) {
fieldCount++;
}
String marker = doc.get("marker");
if (marker != null) {
fieldCount++;
}
c.out.writeByte(isUpdate ? SimplePrimaryNode.CMD_UPDATE_DOC : SimplePrimaryNode.CMD_ADD_DOC);
c.out.writeVInt(fieldCount);
c.out.writeString("docid");
c.out.writeString(docid);
if (title != null) {
c.out.writeString("title");
c.out.writeString(title);
}
if (body != null) {
c.out.writeString("body");
c.out.writeString(body);
}
if (marker != null) {
c.out.writeString("marker");
c.out.writeString(marker);
}
c.flush();
c.in.readByte();
}
public void deleteDocument(Connection c, String docid) throws IOException {
if (isPrimary == false) {
throw new IllegalStateException("only primary can delete documents");
}
c.out.writeByte(SimplePrimaryNode.CMD_DELETE_DOC);
c.out.writeString(docid);
c.flush();
c.in.readByte();
}
public void deleteAllDocuments(Connection c) throws IOException {
if (isPrimary == false) {
throw new IllegalStateException("only primary can delete documents");
}
c.out.writeByte(SimplePrimaryNode.CMD_DELETE_ALL_DOCS);
c.flush();
c.in.readByte();
}
public void forceMerge(Connection c) throws IOException {
if (isPrimary == false) {
throw new IllegalStateException("only primary can force merge");
}
c.out.writeByte(SimplePrimaryNode.CMD_FORCE_MERGE);
c.flush();
c.in.readByte();
}
}