blob: c5a810fffb9ce6d41428aec5106098f985451493 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.fs;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.InputStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.hadoop.fs.FSInputStream;
/**
* The input stream for a distributedlog stream.
*/
@Slf4j
class DLInputStream extends FSInputStream {
private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB
private static class RecordStream {
private final InputStream payloadStream;
private final LogRecordWithDLSN record;
RecordStream(LogRecordWithDLSN record) {
checkNotNull(record);
this.record = record;
this.payloadStream = record.getPayLoadInputStream();
}
}
private static RecordStream nextRecordStream(LogReader reader) throws IOException {
LogRecordWithDLSN record = reader.readNext(false);
if (null != record) {
return new RecordStream(record);
}
return null;
}
private final DistributedLogManager dlm;
private LogReader reader;
private long pos;
private long lastPos;
private RecordStream currentRecord = null;
DLInputStream(DistributedLogManager dlm,
LogReader reader,
long startPos)
throws IOException {
this.dlm = dlm;
this.reader = reader;
this.pos = startPos;
this.lastPos = readEndPos();
seek(startPos);
}
@Override
public void close() throws IOException {
reader.close();
dlm.close();
}
private long readEndPos() throws IOException {
return dlm.getLastTxId();
}
//
// FSInputStream
//
@Override
public void seek(long pos) throws IOException {
if (this.pos == pos) {
return;
}
if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) {
// close the previous reader
this.reader.close();
this.reader = dlm.openLogReader(pos);
this.currentRecord = null;
}
skipTo(pos);
}
private boolean skipTo(final long position) throws IOException {
while (true) {
if (null == currentRecord) {
currentRecord = nextRecordStream(reader);
}
if (null == currentRecord) { // the stream is empty now
return false;
}
long endPos = currentRecord.record.getTransactionId();
if (endPos < position) {
currentRecord = nextRecordStream(reader);
this.pos = endPos;
continue;
} else if (endPos == position){
// find the record, but we defer read next record when actual read happens
this.pos = position;
this.currentRecord = null;
return true;
} else {
this.currentRecord.payloadStream.skip(
this.currentRecord.payloadStream.available() - (endPos - position));
this.pos = position;
return true;
}
}
}
@Override
public long getPos() throws IOException {
return this.pos;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
//
// Input Stream
//
@Override
public int read(byte[] b, final int off, final int len) throws IOException {
int remaining = len;
int numBytesRead = 0;
while (remaining > 0) {
if (null == currentRecord) {
currentRecord = nextRecordStream(reader);
}
if (null == currentRecord) {
if (numBytesRead == 0) {
return -1;
}
break;
}
int bytesLeft = currentRecord.payloadStream.available();
if (bytesLeft <= 0) {
currentRecord.payloadStream.close();
currentRecord = null;
continue;
}
int numBytesToRead = Math.min(bytesLeft, remaining);
int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
if (numBytes < 0) {
continue;
}
numBytesRead += numBytes;
remaining -= numBytes;
}
return numBytesRead;
}
@Override
public long skip(final long n) throws IOException {
if (n <= 0L) {
return 0L;
}
long remaining = n;
while (true) {
if (null == currentRecord) {
currentRecord = nextRecordStream(reader);
}
if (null == currentRecord) { // end of stream
return n - remaining;
}
int bytesLeft = currentRecord.payloadStream.available();
long endPos = currentRecord.record.getTransactionId();
if (remaining > bytesLeft) {
// skip the whole record
remaining -= bytesLeft;
this.pos = endPos;
this.currentRecord = nextRecordStream(reader);
continue;
} else if (remaining == bytesLeft) {
this.pos = endPos;
this.currentRecord = null;
return n;
} else {
currentRecord.payloadStream.skip(remaining);
this.pos = endPos - currentRecord.payloadStream.available();
return n;
}
}
}
@Override
public int available() throws IOException {
if (lastPos - pos == 0L) {
lastPos = readEndPos();
}
return (int) (lastPos - pos);
}
@Override
public boolean markSupported() {
return false;
}
@Override
public int read() throws IOException {
byte[] data = new byte[1];
int numBytes = read(data);
if (numBytes <= 0) {
return -1;
}
return data[0];
}
}