blob: 555f0eafd0b8479f4f8429f4995dc71aef5d005b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.DataInputInputStream;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Log Format: List{Operation, Version, ...}
* ADD, VERSION, DOC
* DELETE, VERSION, ID_BYTES
* DELETE_BY_QUERY, VERSION, String
*
* TODO: keep two files, one for [operation, version, id] and the other for the actual
* document data. That way we could throw away document log files more readily
* while retaining the smaller operation log files longer (and we can retrieve
* the stored fields from the latest documents from the index).
*
* This would require keeping all source fields stored of course.
*
* This would also allow to not log document data for requests with commit=true
* in them (since we know that if the request succeeds, all docs will be committed)
*
*/
public class TransactionLog implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean debug = log.isDebugEnabled();
private boolean trace = log.isTraceEnabled();
public final static String END_MESSAGE = "SOLR_TLOG_END";
long id;
File tlogFile;
RandomAccessFile raf;
FileChannel channel;
OutputStream os;
FastOutputStream fos; // all accesses to this stream should be synchronized on "this" (The TransactionLog)
int numRecords;
boolean isBuffer;
protected volatile boolean deleteOnClose = true; // we can delete old tlogs since they are currently only used for real-time-get (and in the future, recovery)
AtomicInteger refcount = new AtomicInteger(1);
Map<String, Integer> globalStringMap = new HashMap<>();
List<String> globalStringList = new ArrayList<>();
// write a BytesRef as a byte array
static final JavaBinCodec.ObjectResolver resolver = new JavaBinCodec.ObjectResolver() {
@Override
public Object resolve(Object o, JavaBinCodec codec) throws IOException {
if (o instanceof BytesRef) {
BytesRef br = (BytesRef) o;
codec.writeByteArray(br.bytes, br.offset, br.length);
return null;
}
// Fallback: we have no idea how to serialize this. Be noisy to prevent insidious bugs
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"TransactionLog doesn't know how to serialize " + o.getClass() + "; try implementing ObjectResolver?");
}
};
public class LogCodec extends JavaBinCodec {
public LogCodec(JavaBinCodec.ObjectResolver resolver) {
super(resolver);
}
@Override
public void writeExternString(CharSequence s) throws IOException {
if (s == null) {
writeTag(NULL);
return;
}
// no need to synchronize globalStringMap - it's only updated before the first record is written to the log
Integer idx = globalStringMap.get(s.toString());
if (idx == null) {
// write a normal string
writeStr(s);
} else {
// write the extern string
writeTag(EXTERN_STRING, idx);
}
}
@Override
public CharSequence readExternString(DataInputInputStream fis) throws IOException {
int idx = readSize(fis);
if (idx != 0) {// idx != 0 is the index of the extern string
// no need to synchronize globalStringList - it's only updated before the first record is written to the log
return globalStringList.get(idx - 1);
} else {// idx == 0 means it has a string value
// this shouldn't happen with this codec subclass.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Corrupt transaction log");
}
}
@Override
protected Object readObject(DataInputInputStream dis) throws IOException {
if (UUID == tagByte) {
return new java.util.UUID(dis.readLong(), dis.readLong());
}
return super.readObject(dis);
}
@Override
public boolean writePrimitive(Object val) throws IOException {
if (val instanceof java.util.UUID) {
java.util.UUID uuid = (java.util.UUID) val;
daos.writeByte(UUID);
daos.writeLong(uuid.getMostSignificantBits());
daos.writeLong(uuid.getLeastSignificantBits());
return true;
}
return super.writePrimitive(val);
}
}
TransactionLog(File tlogFile, Collection<String> globalStrings) {
this(tlogFile, globalStrings, false);
}
TransactionLog(File tlogFile, Collection<String> globalStrings, boolean openExisting) {
boolean success = false;
try {
if (debug) {
log.debug("New TransactionLog file= {}, exists={}, size={} openExisting={}"
, tlogFile, tlogFile.exists(), tlogFile.length(), openExisting);
}
// Parse tlog id from the filename
String filename = tlogFile.getName();
id = Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1));
this.tlogFile = tlogFile;
raf = new RandomAccessFile(this.tlogFile, "rw");
long start = raf.length();
channel = raf.getChannel();
os = Channels.newOutputStream(channel);
fos = new FastOutputStream(os, new byte[65536], 0);
// fos = FastOutputStream.wrap(os);
if (openExisting) {
if (start > 0) {
readHeader(null);
raf.seek(start);
assert channel.position() == start;
fos.setWritten(start); // reflect that we aren't starting at the beginning
assert fos.size() == channel.size();
} else {
addGlobalStrings(globalStrings);
}
} else {
if (start > 0) {
log.warn("New transaction log already exists:{} size={}", tlogFile, raf.length());
return;
}
if (start > 0) {
raf.setLength(0);
}
addGlobalStrings(globalStrings);
}
success = true;
assert ObjectReleaseTracker.track(this);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} finally {
if (!success && raf != null) {
try {
raf.close();
} catch (Exception e) {
log.error("Error closing tlog file (after error opening)", e);
}
}
}
}
// for subclasses
protected TransactionLog() {
}
/** Returns the number of records in the log (currently includes the header and an optional commit).
* Note: currently returns 0 for reopened existing log files.
*/
public int numRecords() {
synchronized (this) {
return this.numRecords;
}
}
public boolean endsWithCommit() throws IOException {
long size;
synchronized (this) {
fos.flush();
size = fos.size();
}
// the end of the file should have the end message (added during a commit) plus a 4 byte size
byte[] buf = new byte[END_MESSAGE.length()];
long pos = size - END_MESSAGE.length() - 4;
if (pos < 0) return false;
@SuppressWarnings("resource") final ChannelFastInputStream is = new ChannelFastInputStream(channel, pos);
is.read(buf);
for (int i = 0; i < buf.length; i++) {
if (buf[i] != END_MESSAGE.charAt(i)) return false;
}
return true;
}
public long writeData(Object o) {
@SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver);
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
codec.init(fos);
codec.writeVal(o);
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
@SuppressWarnings({"unchecked"})
private void readHeader(FastInputStream fis) throws IOException {
// read existing header
fis = fis != null ? fis : new ChannelFastInputStream(channel, 0);
@SuppressWarnings("resource") final LogCodec codec = new LogCodec(resolver);
@SuppressWarnings({"rawtypes"})
Map header = (Map) codec.unmarshal(fis);
fis.readInt(); // skip size
// needed to read other records
synchronized (this) {
globalStringList = (List<String>) header.get("strings");
globalStringMap = new HashMap<>(globalStringList.size());
for (int i = 0; i < globalStringList.size(); i++) {
globalStringMap.put(globalStringList.get(i), i + 1);
}
}
}
protected void addGlobalStrings(Collection<String> strings) {
if (strings == null) return;
int origSize = globalStringMap.size();
for (String s : strings) {
Integer idx = null;
if (origSize > 0) {
idx = globalStringMap.get(s);
}
if (idx != null) continue; // already in list
globalStringList.add(s);
globalStringMap.put(s, globalStringList.size());
}
assert globalStringMap.size() == globalStringList.size();
}
Collection<String> getGlobalStrings() {
synchronized (this) {
return new ArrayList<>(globalStringList);
}
}
@SuppressWarnings({"unchecked"})
protected void writeLogHeader(LogCodec codec) throws IOException {
long pos = fos.size();
assert pos == 0;
@SuppressWarnings({"rawtypes"})
Map header = new LinkedHashMap<String, Object>();
header.put("SOLR_TLOG", 1); // a magic string + version number
header.put("strings", globalStringList);
codec.marshal(header, fos);
endRecord(pos);
}
protected void endRecord(long startRecordPosition) throws IOException {
fos.writeInt((int) (fos.size() - startRecordPosition));
numRecords++;
}
protected void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
// Unsynchronized access. We can get away with an unsynchronized access here
// since we will never get a false non-zero when the position is in fact 0.
// rollback() is the only function that can reset to zero, and it blocks updates.
if (fos.size() != 0) return;
synchronized (this) {
if (fos.size() != 0) return; // check again while synchronized
if (optional != null) {
addGlobalStrings(optional.getFieldNames());
}
writeLogHeader(codec);
}
}
int lastAddSize;
/**
* Writes an add update command to the transaction log. This is not applicable for
* in-place updates; use {@link #write(AddUpdateCommand, long)}.
* (The previous pointer (applicable for in-place updates) is set to -1 while writing
* the command to the transaction log.)
* @param cmd The add update command to be written
* @return Returns the position pointer of the written update command
*
* @see #write(AddUpdateCommand, long)
*/
public long write(AddUpdateCommand cmd) {
return write(cmd, -1);
}
/**
* Writes an add update command to the transaction log. This should be called only for
* writing in-place updates, or else pass -1 as the prevPointer.
* @param cmd The add update command to be written
* @param prevPointer The pointer in the transaction log which this update depends
* on (applicable for in-place updates)
* @return Returns the position pointer of the written update command
*/
public long write(AddUpdateCommand cmd, long prevPointer) {
assert (-1 <= prevPointer && (cmd.isInPlaceUpdate() || (-1 == prevPointer)));
LogCodec codec = new LogCodec(resolver);
SolrInputDocument sdoc = cmd.getSolrInputDocument();
try {
checkWriteHeader(codec, sdoc);
// adaptive buffer sizing
int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine
// at least 256 bytes and at most 1 MB
bufSize = Math.min(1024 * 1024, Math.max(256, bufSize + (bufSize >> 3) + 256));
MemOutputStream out = new MemOutputStream(new byte[bufSize]);
codec.init(out);
if (cmd.isInPlaceUpdate()) {
codec.writeTag(JavaBinCodec.ARR, 5);
codec.writeInt(UpdateLog.UPDATE_INPLACE); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeLong(prevPointer);
codec.writeLong(cmd.prevVersion);
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
} else {
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.ADD); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
}
lastAddSize = (int) out.size();
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
/***
System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
if (pos != fos.size()) {
throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
}
***/
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
// TODO: reset our file pointer back to "pos", the start of this record.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
}
public long writeDelete(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
try {
checkWriteHeader(codec, null);
BytesRef br = cmd.getIndexedId();
MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeByteArray(br.bytes, br.offset, br.length);
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public long writeDeleteByQuery(DeleteUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
try {
checkWriteHeader(codec, null);
MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public long writeCommit(CommitUpdateCommand cmd) {
LogCodec codec = new LogCodec(resolver);
synchronized (this) {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
writeLogHeader(codec);
pos = fos.size();
}
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.COMMIT); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file
endRecord(pos);
fos.flush(); // flush since this will be the last record in a log fill
assert fos.size() == channel.size();
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}
/* This method is thread safe */
public Object lookup(long pos) {
// A negative position can result from a log replay (which does not re-log, but does
// update the version map. This is OK since the node won't be ACTIVE when this happens.
if (pos < 0) return null;
try {
// make sure any unflushed buffer has been flushed
synchronized (this) {
// TODO: optimize this by keeping track of what we have flushed up to
fos.flushBuffer();
/***
System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
if (fos.size() != raf.length() || pos >= fos.size() ) {
throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
}
***/
}
ChannelFastInputStream fis = new ChannelFastInputStream(channel, pos);
try (LogCodec codec = new LogCodec(resolver)) {
return codec.readVal(fis);
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public void incref() {
int result = refcount.incrementAndGet();
if (result <= 1) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "incref on a closed log: " + this);
}
}
public boolean try_incref() {
return refcount.incrementAndGet() > 1;
}
public void decref() {
if (refcount.decrementAndGet() == 0) {
close();
}
}
/** returns the current position in the log file */
public long position() {
synchronized (this) {
return fos.size();
}
}
/** Move to a read-only state, closing and releasing resources while keeping the log available for reads */
public void closeOutput() {
}
public void finish(UpdateLog.SyncLevel syncLevel) {
if (syncLevel == UpdateLog.SyncLevel.NONE) return;
try {
synchronized (this) {
fos.flushBuffer();
}
if (syncLevel == UpdateLog.SyncLevel.FSYNC) {
// Since fsync is outside of synchronized block, we can end up with a partial
// last record on power failure (which is OK, and does not represent an error...
// we just need to be aware of it when reading).
raf.getFD().sync();
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public void close() {
try {
if (debug) {
log.debug("Closing tlog {}", this);
}
synchronized (this) {
fos.flush();
fos.close();
}
if (deleteOnClose) {
try {
Files.deleteIfExists(tlogFile.toPath());
} catch (IOException e) {
// TODO: should this class care if a file couldnt be deleted?
// this just emulates previous behavior, where only SecurityException would be handled.
}
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} finally {
assert ObjectReleaseTracker.release(this);
}
}
public void forceClose() {
if (refcount.get() > 0) {
log.error("Error: Forcing close of {}", this);
refcount.set(0);
close();
}
}
@Override
public String toString() {
return "tlog{file=" + tlogFile.toString() + " refcount=" + refcount.get() + "}";
}
public long getLogSize() {
if (tlogFile != null) {
return tlogFile.length();
}
return 0;
}
/**
* @return the FastOutputStream size
*/
public synchronized long getLogSizeFromStream() {
return fos.size();
}
/** Returns a reader that can be used while a log is still in use.
* Currently only *one* LogReader may be outstanding, and that log may only
* be used from a single thread.
*/
public LogReader getReader(long startingPos) {
return new LogReader(startingPos);
}
public LogReader getSortedReader(long startingPos) {
return new SortedLogReader(startingPos);
}
/** Returns a single threaded reverse reader */
public ReverseReader getReverseReader() throws IOException {
return new FSReverseReader();
}
public class LogReader {
protected ChannelFastInputStream fis;
private LogCodec codec = new LogCodec(resolver);
public LogReader(long startingPos) {
incref();
fis = new ChannelFastInputStream(channel, startingPos);
}
// for classes that extend
protected LogReader() {}
/** Returns the next object from the log, or null if none available.
*
* @return The log record, or null if EOF
* @throws IOException If there is a low-level I/O error.
*/
public Object next() throws IOException, InterruptedException {
long pos = fis.position();
synchronized (TransactionLog.this) {
if (trace) {
log.trace("Reading log record. pos={} currentSize={}", pos, fos.size());
}
if (pos >= fos.size()) {
return null;
}
fos.flushBuffer();
}
if (pos == 0) {
readHeader(fis);
// shouldn't currently happen - header and first record are currently written at the same time
synchronized (TransactionLog.this) {
if (fis.position() >= fos.size()) {
return null;
}
pos = fis.position();
}
}
Object o = codec.readVal(fis);
// skip over record size
int size = fis.readInt();
assert size == fis.position() - pos - 4;
return o;
}
public void close() {
decref();
}
@Override
public String toString() {
synchronized (TransactionLog.this) {
return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
}
}
// returns best effort current position
// for info purposes
public long currentPos() {
return fis.position();
}
// returns best effort current size
// for info purposes
public long currentSize() throws IOException {
return channel.size();
}
}
public class SortedLogReader extends LogReader {
private long startingPos;
private boolean inOrder = true;
private TreeMap<Long, Long> versionToPos;
Iterator<Long> iterator;
public SortedLogReader(long startingPos) {
super(startingPos);
this.startingPos = startingPos;
}
@Override
public Object next() throws IOException, InterruptedException {
if (versionToPos == null) {
versionToPos = new TreeMap<>();
Object o;
long pos = startingPos;
long lastVersion = Long.MIN_VALUE;
while ((o = super.next()) != null) {
@SuppressWarnings({"rawtypes"})
List entry = (List) o;
long version = (Long) entry.get(UpdateLog.VERSION_IDX);
version = Math.abs(version);
versionToPos.put(version, pos);
pos = currentPos();
if (version < lastVersion) inOrder = false;
lastVersion = version;
}
fis.seek(startingPos);
}
if (inOrder) {
return super.next();
} else {
if (iterator == null) iterator = versionToPos.values().iterator();
if (!iterator.hasNext()) return null;
long pos = iterator.next();
if (pos != currentPos()) fis.seek(pos);
return super.next();
}
}
}
public abstract class ReverseReader {
/** Returns the next object from the log, or null if none available.
*
* @return The log record, or null if EOF
* @throws IOException If there is a low-level I/O error.
*/
public abstract Object next() throws IOException;
/* returns the position in the log file of the last record returned by next() */
public abstract long position();
public abstract void close();
@Override
public abstract String toString();
}
public class FSReverseReader extends ReverseReader {
ChannelFastInputStream fis;
private LogCodec codec = new LogCodec(resolver) {
@Override
public SolrInputDocument readSolrInputDocument(DataInputInputStream dis) {
// Given that the SolrInputDocument is last in an add record, it's OK to just skip
// reading it completely.
return null;
}
};
int nextLength; // length of the next record (the next one closer to the start of the log file)
long prevPos; // where we started reading from last time (so prevPos - nextLength == start of next record)
public FSReverseReader() throws IOException {
incref();
long sz;
synchronized (TransactionLog.this) {
fos.flushBuffer();
sz = fos.size();
assert sz == channel.size();
}
fis = new ChannelFastInputStream(channel, 0);
if (sz >= 4) {
// readHeader(fis); // should not be needed
prevPos = sz - 4;
fis.seek(prevPos);
nextLength = fis.readInt();
}
}
/** Returns the next object from the log, or null if none available.
*
* @return The log record, or null if EOF
* @throws IOException If there is a low-level I/O error.
*/
public Object next() throws IOException {
if (prevPos <= 0) return null;
long endOfThisRecord = prevPos;
int thisLength = nextLength;
long recordStart = prevPos - thisLength; // back up to the beginning of the next record
prevPos = recordStart - 4; // back up 4 more to read the length of the next record
if (prevPos <= 0) return null; // this record is the header
long bufferPos = fis.getBufferPos();
if (prevPos >= bufferPos) {
// nothing to do... we're within the current buffer
} else {
// Position buffer so that this record is at the end.
// For small records, this will cause subsequent calls to next() to be within the buffer.
long seekPos = endOfThisRecord - fis.getBufferSize();
seekPos = Math.min(seekPos, prevPos); // seek to the start of the record if it's larger then the block size.
seekPos = Math.max(seekPos, 0);
fis.seek(seekPos);
fis.peek(); // cause buffer to be filled
}
fis.seek(prevPos);
nextLength = fis.readInt(); // this is the length of the *next* record (i.e. closer to the beginning)
// TODO: optionally skip document data
Object o = codec.readVal(fis);
// assert fis.position() == prevPos + 4 + thisLength; // this is only true if we read all the data (and we currently skip reading SolrInputDocument
return o;
}
/* returns the position in the log file of the last record returned by next() */
public long position() {
return prevPos + 4; // skip the length
}
public void close() {
decref();
}
@Override
public String toString() {
synchronized (TransactionLog.this) {
return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}";
}
}
}
static class ChannelFastInputStream extends FastInputStream {
private FileChannel ch;
public ChannelFastInputStream(FileChannel ch, long chPosition) {
// super(null, new byte[10],0,0); // a small buffer size for testing purposes
super(null);
this.ch = ch;
super.readFromStream = chPosition;
}
@Override
public int readWrappedStream(byte[] target, int offset, int len) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(target, offset, len);
int ret = ch.read(bb, readFromStream);
return ret;
}
public void seek(long position) throws IOException {
if (position <= readFromStream && position >= getBufferPos()) {
// seek within buffer
pos = (int) (position - getBufferPos());
} else {
// long currSize = ch.size(); // not needed - underlying read should handle (unless read never done)
// if (position > currSize) throw new EOFException("Read past EOF: seeking to " + position + " on file of size " + currSize + " file=" + ch);
readFromStream = position;
end = pos = 0;
}
assert position() == position;
}
/** where is the start of the buffer relative to the whole file */
public long getBufferPos() {
return readFromStream - end;
}
public int getBufferSize() {
return buf.length;
}
@Override
public void close() throws IOException {
ch.close();
}
@Override
public String toString() {
return "readFromStream=" + readFromStream + " pos=" + pos + " end=" + end + " bufferPos=" + getBufferPos() + " position=" + position();
}
}
}