blob: 0eef7325042d3df4ee8c87737a6c96caeab0ab94 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.io;
import java.io.IOException;
import java.io.InputStream;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.util.E;
public class BufferedStreamInput extends UnsafeBytesInput {
private final int bufferCapacity;
private final InputStream input;
private long inputOffset;
public BufferedStreamInput(InputStream input) throws IOException {
this(input, Constants.BIG_BUF_SIZE);
}
public BufferedStreamInput(InputStream input, int bufferCapacity)
throws IOException {
super(new byte[bufferCapacity], 0);
E.checkArgument(bufferCapacity >= 8,
"The parameter bufferSize must be >= 8");
this.input = input;
this.bufferCapacity = bufferCapacity;
this.shiftAndFillBuffer();
}
@Override
public long position() {
return this.inputOffset - super.remaining();
}
@Override
public void readFully(byte[] b) throws IOException {
this.readFully(b, 0, b.length);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
if (len <= super.remaining()) {
super.readFully(b, off, len);
} else if (len <= this.bufferCapacity) {
this.shiftAndFillBuffer();
super.readFully(b, off, len);
} else {
int remaining = super.remaining();
super.readFully(b, off, remaining);
int expectedLen = len - remaining;
while (expectedLen > 0) {
int readLen = this.input.read(b, off + remaining, expectedLen);
if (readLen == -1) {
throw new IOException("There is no enough data in input " +
"stream");
}
expectedLen -= readLen;
}
this.inputOffset += len;
}
}
@Override
public void seek(long position) throws IOException {
long bufferStart = this.inputOffset - this.limit();
if (position >= bufferStart && position < this.inputOffset) {
super.seek(position - bufferStart);
return;
}
/*
* The reason for seeking beyond the current buffer location is that
* the user may need to skip unread data and know the offset of the
* required data.
*/
if (position >= this.inputOffset) {
int skipLen = (int) (position - this.inputOffset);
this.inputOffset += skipLen;
byte[] buffer = this.buffer();
while (skipLen > 0) {
int expectLen = Math.min(skipLen, this.bufferCapacity);
int readLen = this.input.read(buffer, 0, expectLen);
if (readLen == -1) {
throw new IOException(String.format(
"Can't seek at position %s, reach the end of " +
"input stream",
position));
}
skipLen -= expectLen;
}
super.seek(0);
super.limit(0);
this.fillBuffer();
} else {
throw new IOException(String.format(
"The seek position %s underflows the start position " +
"%s of the buffer",
position, this.inputOffset - this.limit()));
}
}
@Override
public long skip(long bytesToSkip) throws IOException {
E.checkArgument(bytesToSkip >= 0L,
"The parameter bytesToSkip must be >= 0, but got %s",
bytesToSkip);
long positionBeforeSkip = this.position();
if (bytesToSkip <= this.remaining()) {
super.skip(bytesToSkip);
return positionBeforeSkip;
} else {
bytesToSkip -= this.remaining();
long position = this.inputOffset + bytesToSkip;
this.seek(position);
return positionBeforeSkip;
}
}
@Override
public void close() throws IOException {
this.input.close();
}
@Override
protected void require(int size) throws IOException {
if (this.remaining() >= size) {
return;
}
this.shiftAndFillBuffer();
/*
* The buffer capacity must be >= 8, read primitive data like int,
* long, float, double can be read from buffer. Only read bytes may
* exceed the limit, and read bytes using readFully is overrode
* in this class. In conclusion, the required data can be
* supplied after shiftAndFillBuffer.
*/
if (size > this.limit()) {
throw new IOException(String.format(
"Read %s bytes from position %s overflows buffer %s",
size, this.position(), this.limit()));
}
}
/**
* The stream has no limit.
*/
@Override
public long available() throws IOException {
return Long.MAX_VALUE;
}
private void shiftAndFillBuffer() throws IOException {
this.shiftBuffer();
this.fillBuffer();
}
private void fillBuffer() throws IOException {
int expectLen = this.bufferCapacity - this.limit();
int readLen = this.input.read(this.buffer(), this.limit(), expectLen);
if (readLen > 0) {
this.limit(this.limit() + readLen);
this.inputOffset += readLen;
}
}
}