blob: ce75a2902a85a19c802efdc93f76f934e89cf44b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.RAMOutputStream;
/** This is a stupid yet functional transaction log: it never fsync's, never prunes, it's over-synchronized, it hard-wires id field name to "docid", can
* only handle specific docs/fields used by this test, etc. It's just barely enough to show how a translog could work on top of NRT
* replication to guarantee no data loss when nodes crash */
class SimpleTransLog implements Closeable {
final FileChannel channel;
final RAMOutputStream buffer = new RAMOutputStream();
final byte[] intBuffer = new byte[4];
final ByteBuffer intByteBuffer = ByteBuffer.wrap(intBuffer);
private final static byte OP_ADD_DOCUMENT = (byte) 0;
private final static byte OP_UPDATE_DOCUMENT = (byte) 1;
private final static byte OP_DELETE_DOCUMENTS = (byte) 2;
public SimpleTransLog(Path path) throws IOException {
channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
}
public synchronized long getNextLocation() throws IOException {
return channel.position();
}
/** Appends an addDocument op */
public synchronized long addDocument(String id, Document doc) throws IOException {
assert buffer.getFilePointer() == 0;
buffer.writeByte(OP_ADD_DOCUMENT);
encode(id, doc);
return flushBuffer();
}
/** Appends an updateDocument op */
public synchronized long updateDocument(String id, Document doc) throws IOException {
assert buffer.getFilePointer() == 0;
buffer.writeByte(OP_UPDATE_DOCUMENT);
encode(id, doc);
return flushBuffer();
}
/** Appends a deleteDocuments op */
public synchronized long deleteDocuments(String id) throws IOException {
assert buffer.getFilePointer() == 0;
buffer.writeByte(OP_DELETE_DOCUMENTS);
buffer.writeString(id);
return flushBuffer();
}
/** Writes buffer to the file and returns the start position. */
private synchronized long flushBuffer() throws IOException {
long pos = channel.position();
int len = (int) buffer.getFilePointer();
byte[] bytes = new byte[len];
buffer.writeTo(bytes, 0);
buffer.reset();
intBuffer[0] = (byte) (len >> 24);
intBuffer[1] = (byte) (len >> 16);
intBuffer[2] = (byte) (len >> 8);
intBuffer[3] = (byte) len;
intByteBuffer.limit(4);
intByteBuffer.position(0);
writeBytesToChannel(intByteBuffer);
writeBytesToChannel(ByteBuffer.wrap(bytes));
return pos;
}
private void writeBytesToChannel(ByteBuffer src) throws IOException {
int left = src.limit();
while (left != 0) {
left -= channel.write(src);
}
}
private void readBytesFromChannel(long pos, ByteBuffer dest) throws IOException {
int left = dest.limit() - dest.position();
long end = pos + left;
while (pos < end) {
int inc = channel.read(dest, pos);
if (inc < 0) {
throw new EOFException();
}
pos += inc;
}
}
/** Replays ops between start and end location against the provided writer. Can run concurrently with ongoing operations. */
public void replay(NodeProcess primary, long start, long end) throws IOException {
try (Connection c = new Connection(primary.tcpPort)) {
c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
byte[] intBuffer = new byte[4];
ByteBuffer intByteBuffer = ByteBuffer.wrap(intBuffer);
ByteArrayDataInput in = new ByteArrayDataInput();
long pos = start;
while (pos < end) {
intByteBuffer.position(0);
intByteBuffer.limit(4);
readBytesFromChannel(pos, intByteBuffer);
pos += 4;
int len = ((intBuffer[0] & 0xff) << 24) |
(intBuffer[1] & 0xff) << 16 |
(intBuffer[2] & 0xff) << 8 |
(intBuffer[3] & 0xff);
byte[] bytes = new byte[len];
readBytesFromChannel(pos, ByteBuffer.wrap(bytes));
pos += len;
in.reset(bytes);
byte op = in.readByte();
//System.out.println("xlog: replay op=" + op);
switch (op) {
case 0:
// We replay add as update:
replayAddDocument(c, primary, in);
break;
case 1:
// We replay add as update:
replayAddDocument(c, primary, in);
break;
case 2:
replayDeleteDocuments(c, primary, in);
break;
default:
throw new CorruptIndexException("invalid operation " + op, in);
}
}
assert pos == end;
//System.out.println("xlog: done replay");
c.out.writeByte(SimplePrimaryNode.CMD_INDEXING_DONE);
c.flush();
//System.out.println("xlog: done flush");
c.in.readByte();
//System.out.println("xlog: done readByte");
}
}
private void replayAddDocument(Connection c, NodeProcess primary, DataInput in) throws IOException {
String id = in.readString();
Document doc = new Document();
doc.add(new StringField("docid", id, Field.Store.YES));
String title = readNullableString(in);
if (title != null) {
doc.add(new StringField("title", title, Field.Store.NO));
doc.add(new TextField("titleTokenized", title, Field.Store.NO));
}
String body = readNullableString(in);
if (body != null) {
doc.add(new TextField("body", body, Field.Store.NO));
}
String marker = readNullableString(in);
if (marker != null) {
//TestStressNRTReplication.message("xlog: replay marker=" + id);
doc.add(new StringField("marker", marker, Field.Store.YES));
}
// For both add and update originally, we use updateDocument to replay,
// because the doc could in fact already be in the index:
// nocomit what if this fails?
primary.addOrUpdateDocument(c, doc, false);
}
private void replayDeleteDocuments(Connection c, NodeProcess primary, DataInput in) throws IOException {
String id = in.readString();
// nocomit what if this fails?
primary.deleteDocument(c, id);
}
/** Encodes doc into buffer. NOTE: this is NOT general purpose! It only handles the fields used in this test! */
private synchronized void encode(String id, Document doc) throws IOException {
assert id.equals(doc.get("docid")): "id=" + id + " vs docid=" + doc.get("docid");
buffer.writeString(id);
writeNullableString(doc.get("title"));
writeNullableString(doc.get("body"));
writeNullableString(doc.get("marker"));
}
private synchronized void writeNullableString(String s) throws IOException {
if (s == null) {
buffer.writeByte((byte) 0);
} else {
buffer.writeByte((byte) 1);
buffer.writeString(s);
}
}
private String readNullableString(DataInput in) throws IOException {
byte b = in.readByte();
if (b == 0) {
return null;
} else if (b == 1) {
return in.readString();
} else {
throw new CorruptIndexException("invalid string lead byte " + b, in);
}
}
@Override
public synchronized void close() throws IOException {
channel.close();
}
}