blob: 64b8c149c9fa750c3b2cff89530bb62215ee00d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.invoke.MethodHandles;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.util.Collection;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.update.processor.CdcrUpdateProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends {@link org.apache.solr.update.TransactionLog} to:
* <ul>
* <li>reopen automatically the output stream if its reference count reached 0. This is achieved by extending
* methods {@link #incref()}, {@link #close()} and {@link #reopenOutputStream()}.</li>
* <li>encode the number of records in the tlog file in the last commit record. The number of records will be
* decoded and reuse if the tlog file is reopened. This is achieved by extending the constructor, and the
* methods {@link #writeCommit(CommitUpdateCommand)} and {@link #getReader(long)}.</li>
* </ul>
* @deprecated since 8.6
*/
@Deprecated
public class CdcrTransactionLog extends TransactionLog {
private boolean isReplaying;
long startVersion; // (absolute) version of the first element of this transaction log
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean debug = log.isDebugEnabled();
CdcrTransactionLog(File tlogFile, Collection<String> globalStrings) {
super(tlogFile, globalStrings);
// The starting version number will be used to seek more efficiently tlogs
// and to filter out tlog files during replication (in ReplicationHandler#getTlogFileList)
String filename = tlogFile.getName();
startVersion = Math.abs(Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)));
isReplaying = false;
}
CdcrTransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
super(tlogFile, globalStrings, openExisting);
// The starting version number will be used to seek more efficiently tlogs
String filename = tlogFile.getName();
startVersion = Math.abs(Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)));
numRecords = openExisting ? this.readNumRecords() : 0;
// if we try to reopen an existing tlog file and that the number of records is equal to 0, then we are replaying
// the log and we will append a commit
if (openExisting && numRecords == 0) {
isReplaying = true;
}
}
/**
* Returns the number of records in the log (currently includes the header and an optional commit).
*/
public int numRecords() {
return super.numRecords();
}
/**
* The last record of the transaction log file is expected to be a commit with a 4 byte integer that encodes the
* number of records in the file.
*/
private int readNumRecords() {
try {
if (endsWithCommit()) {
long size = fos.size();
// 4 bytes for the record size, the lenght of the end message + 1 byte for its value tag,
// and 4 bytes for the number of records
long pos = size - 4 - END_MESSAGE.length() - 1 - 4;
if (pos < 0) return 0;
try (ChannelFastInputStream is = new ChannelFastInputStream(channel, pos)) {
return is.readInt();
}
}
} catch (IOException e) {
log.error("Error while reading number of records in tlog {}", this, e);
}
return 0;
}
@Override
public long write(AddUpdateCommand cmd, long prevPointer) {
assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
LogCodec codec = new LogCodec(resolver);
SolrInputDocument sdoc = cmd.getSolrInputDocument();
try {
checkWriteHeader(codec, sdoc);
// adaptive buffer sizing
int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine
bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256);
MemOutputStream out = new MemOutputStream(new byte[bufSize]);
codec.init(out);
if (cmd.isInPlaceUpdate()) {
codec.writeTag(JavaBinCodec.ARR, 6);
codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeLong(prevPointer);
codec.writeLong(cmd.prevVersion);
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
} else {
codec.writeTag(JavaBinCodec.ARR, 4);
codec.writeInt(UpdateLog.ADD); // should just take one byte
codec.writeLong(cmd.getVersion());
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add extra boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
}
lastAddSize = (int)out.size();
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
/***
System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
if (pos != fos.size()) {
throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
}
***/
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
// TODO: reset our file pointer back to "pos", the start of this record.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
}
@Override
public long writeDelete(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
try {
checkWriteHeader(codec, null);
BytesRef br = cmd.getIndexedId();
MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 4);
codec.writeInt(UpdateLog.DELETE); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeByteArray(br.bytes, br.offset, br.length);
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add extra boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
@Override
public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
try {
checkWriteHeader(codec, null);
MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 4);
codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
if (cmd.getReq().getParamString().contains(CdcrUpdateProcessor.CDCR_UPDATE)) {
// if the update is received via cdcr source; add extra boolean entry
// CdcrReplicator.isTargetCluster() checks that particular boolean to accept or discard the update
// to forward to its own target cluster
codec.writePrimitive(true);
} else {
codec.writePrimitive(false);
}
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
@Override
public long writeCommit(CommitUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
synchronized (this) {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
writeLogHeader(codec);
pos = fos.size();
}
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 4);
codec.writeInt(UpdateLog.COMMIT); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeTag(JavaBinCodec.INT); // Enforce the encoding of a plain integer, to simplify decoding
fos.writeInt(numRecords + 1); // the number of records in the file - +1 to account for the commit operation being written
codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file
endRecord(pos);
fos.flush(); // flush since this will be the last record in a log fill
assert fos.size() == channel.size();
isReplaying = false; // we have replayed and appended a commit record with the number of records in the file
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}
/**
* Returns a reader that can be used while a log is still in use.
* Currently only *one* LogReader may be outstanding, and that log may only
* be used from a single thread.
*/
@Override
public LogReader getReader(long startingPos) {
return new CdcrLogReader(startingPos);
}
public class CdcrLogReader extends LogReader {
private int numRecords = 1; // start at 1 to account for the header record
public CdcrLogReader(long startingPos) {
super(startingPos);
}
@Override
public Object next() throws IOException, InterruptedException {
Object o = super.next();
if (o != null) {
this.numRecords++;
// We are replaying the log. We need to update the number of records for the writeCommit.
if (isReplaying) {
synchronized (CdcrTransactionLog.this) {
CdcrTransactionLog.this.numRecords = this.numRecords;
}
}
}
return o;
}
}
@Override
public void incref() {
// if the refcount is 0, we need to reopen the output stream
if (refcount.getAndIncrement() == 0) {
reopenOutputStream(); // synchronised with this
}
}
/**
* Modified to act like {@link #incref()} in order to be compatible with {@link UpdateLog#recoverFromLog()}.
* Otherwise, we would have to duplicate the method {@link UpdateLog#recoverFromLog()} in
* {@link org.apache.solr.update.CdcrUpdateLog} and change the call
* {@code if (!ll.try_incref()) continue; } to {@code incref(); }.
*/
@Override
public boolean try_incref() {
this.incref();
return true;
}
@Override
public void close() {
try {
if (debug) {
log.debug("Closing tlog {}", this);
}
synchronized (this) {
if (fos != null) {
fos.flush();
fos.close();
// dereference these variables for GC
fos = null;
os = null;
channel = null;
raf = null;
}
}
if (deleteOnClose) {
try {
Files.deleteIfExists(tlogFile.toPath());
} catch (IOException e) {
// TODO: should this class care if a file couldnt be deleted?
// this just emulates previous behavior, where only SecurityException would be handled.
}
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} finally {
assert ObjectReleaseTracker.release(this);
}
}
/**
* Re-open the output stream of the tlog and position
* the file pointer at the end of the file. It assumes
* that the tlog is non-empty and that the tlog's header
* has been already read.
*/
synchronized void reopenOutputStream() {
try {
if (debug) {
log.debug("Re-opening tlog's output stream: {}", this);
}
raf = new RandomAccessFile(this.tlogFile, "rw");
channel = raf.getChannel();
long start = raf.length();
raf.seek(start);
os = Channels.newOutputStream(channel);
fos = new FastOutputStream(os, new byte[65536], 0);
fos.setWritten(start); // reflect that we aren't starting at the beginning
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}