blob: 42943721d26a919347f503bd6a44a9efcb679e8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
/**
* The base class for all input views that are backed by multiple memory pages. This base class contains all
* decoding methods to read data from a page and detect page boundary crossing. The concrete sub classes must
* implement the methods to provide the next memory page once the boundary is crossed.
*/
public abstract class AbstractPagedInputView implements DataInputView {
private MemorySegment currentSegment;
protected final int headerLength; // the number of bytes to skip at the beginning of each segment
private int positionInSegment; // the offset in the current segment
private int limitInSegment; // the limit in the current segment before switching to the next
private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
/**
* Creates a new view that starts with the given segment. The input starts directly after the header
* of the given page. If the header size is zero, it starts at the beginning. The specified initial
* limit describes up to which position data may be read from the current segment, before the view must
* advance to the next segment.
*
* @param initialSegment The memory segment to start reading from.
* @param initialLimit The position one after the last valid byte in the initial segment.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header. This
* length must be the same for all memory segments.
*/
protected AbstractPagedInputView(MemorySegment initialSegment, int initialLimit, int headerLength) {
this.headerLength = headerLength;
this.positionInSegment = headerLength;
seekInput(initialSegment, headerLength, initialLimit);
}
/**
* Creates a new view that is initially not bound to a memory segment. This constructor is typically
* for views that always seek first.
*
* <p>WARNING: The view is not readable until the first call to either {@link #advance()},
* or to {@link #seekInput(MemorySegment, int, int)}.
*
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
*/
protected AbstractPagedInputView(int headerLength) {
this.headerLength = headerLength;
}
// --------------------------------------------------------------------------------------------
// Page Management
// --------------------------------------------------------------------------------------------
/**
* Gets the memory segment that will be used to read the next bytes from. If the segment is exactly exhausted,
* meaning that the last byte read was the last byte available in the segment, then this segment will
* not serve the next bytes. The segment to serve the next bytes will be obtained through the
* {@link #nextSegment(MemorySegment)} method.
*
* @return The current memory segment.
*/
public MemorySegment getCurrentSegment() {
return this.currentSegment;
}
/**
* Gets the position from which the next byte will be read. If that position is equal to the current limit,
* then the next byte will be read from next segment.
*
* @return The position from which the next byte will be read.
* @see #getCurrentSegmentLimit()
*/
public int getCurrentPositionInSegment() {
return this.positionInSegment;
}
/**
* Gets the current limit in the memory segment. This value points to the byte one after the last valid byte
* in the memory segment.
*
* @return The current limit in the memory segment.
* @see #getCurrentPositionInSegment()
*/
public int getCurrentSegmentLimit() {
return this.limitInSegment;
}
public void updateCurrentSegmentLimit() {
this.limitInSegment = getLimitForSegment(this.currentSegment);
}
/**
* The method by which concrete subclasses realize page crossing. This method is invoked when the current page
* is exhausted and a new page is required to continue the reading. If no further page is available, this
* method must throw an {@link EOFException}.
*
* @param current The current page that was read to its limit. May be {@code null}, if this method is
* invoked for the first time.
* @return The next page from which the reading should continue. May not be {@code null}. If the input is
* exhausted, an {@link EOFException} must be thrown instead.
*
* @throws EOFException Thrown, if no further segment is available.
* @throws IOException Thrown, if the method cannot provide the next page due to an I/O related problem.
*/
protected abstract MemorySegment nextSegment(MemorySegment current) throws EOFException, IOException;
/**
* Gets the limit for reading bytes from the given memory segment. This method must return the position
* of the byte after the last valid byte in the given memory segment. When the position returned by this
* method is reached, the view will attempt to switch to the next memory segment.
*
* @param segment The segment to determine the limit for.
* @return The limit for the given memory segment.
*/
protected abstract int getLimitForSegment(MemorySegment segment);
/**
* Advances the view to the next memory segment. The reading will continue after the header of the next
* segment. This method uses {@link #nextSegment(MemorySegment)} and {@link #getLimitForSegment(MemorySegment)}
* to get the next segment and set its limit.
*
* @throws IOException Thrown, if the next segment could not be obtained.
*
* @see #nextSegment(MemorySegment)
* @see #getLimitForSegment(MemorySegment)
*/
public final void advance() throws IOException {
// note: this code ensures that in case of EOF, we stay at the same position such that
// EOF is reproducible (if nextSegment throws a reproducible EOFException)
this.currentSegment = nextSegment(this.currentSegment);
this.limitInSegment = getLimitForSegment(this.currentSegment);
this.positionInSegment = this.headerLength;
afterAdvance();
}
/**
* This method will run after advance(). Override it if you want to use it.
*/
protected void afterAdvance() {}
/**
* Sets the internal state of the view such that the next bytes will be read from the given memory segment,
* starting at the given position. The memory segment will provide bytes up to the given limit position.
*
* @param segment The segment to read the next bytes from.
* @param positionInSegment The position in the segment to start reading from.
* @param limitInSegment The limit in the segment. When reached, the view will attempt to switch to
* the next segment.
*/
protected void seekInput(MemorySegment segment, int positionInSegment, int limitInSegment) {
this.currentSegment = segment;
this.positionInSegment = positionInSegment;
this.limitInSegment = limitInSegment;
}
/**
* Clears the internal state of the view. After this call, all read attempts will fail, until the
* {@link #advance()} or {@link #seekInput(MemorySegment, int, int)} method have been invoked.
*/
protected void clear() {
this.currentSegment = null;
this.positionInSegment = this.headerLength;
this.limitInSegment = headerLength;
}
// --------------------------------------------------------------------------------------------
// Data Input Specific methods
// --------------------------------------------------------------------------------------------
@Override
public int read(byte[] b) throws IOException{
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException{
if (off < 0 || len < 0 || off + len > b.length) {
throw new IndexOutOfBoundsException();
}
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= len) {
this.currentSegment.get(this.positionInSegment, b, off, len);
this.positionInSegment += len;
return len;
}
else {
if (remaining == 0) {
try {
advance();
}
catch (EOFException eof) {
return -1;
}
remaining = this.limitInSegment - this.positionInSegment;
}
int bytesRead = 0;
while (true) {
int toRead = Math.min(remaining, len - bytesRead);
this.currentSegment.get(this.positionInSegment, b, off, toRead);
off += toRead;
bytesRead += toRead;
if (len > bytesRead) {
try {
advance();
}
catch (EOFException eof) {
this.positionInSegment += toRead;
return bytesRead;
}
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += toRead;
break;
}
}
return len;
}
}
@Override
public void readFully(byte[] b) throws IOException {
readFully(b, 0, b.length);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
int bytesRead = read(b, off, len);
if (bytesRead < len){
throw new EOFException("There is no enough data left in the DataInputView.");
}
}
@Override
public boolean readBoolean() throws IOException {
return readByte() == 1;
}
@Override
public byte readByte() throws IOException {
if (this.positionInSegment < this.limitInSegment) {
return this.currentSegment.get(this.positionInSegment++);
}
else {
advance();
return readByte();
}
}
@Override
public int readUnsignedByte() throws IOException {
return readByte() & 0xff;
}
@Override
public short readShort() throws IOException {
if (this.positionInSegment < this.limitInSegment - 1) {
final short v = this.currentSegment.getShortBigEndian(this.positionInSegment);
this.positionInSegment += 2;
return v;
}
else if (this.positionInSegment == this.limitInSegment) {
advance();
return readShort();
}
else {
return (short) ((readUnsignedByte() << 8) | readUnsignedByte());
}
}
@Override
public int readUnsignedShort() throws IOException {
if (this.positionInSegment < this.limitInSegment - 1) {
final int v = this.currentSegment.getShortBigEndian(this.positionInSegment) & 0xffff;
this.positionInSegment += 2;
return v;
}
else if (this.positionInSegment == this.limitInSegment) {
advance();
return readUnsignedShort();
}
else {
return (readUnsignedByte() << 8) | readUnsignedByte();
}
}
@Override
public char readChar() throws IOException {
if (this.positionInSegment < this.limitInSegment - 1) {
final char v = this.currentSegment.getCharBigEndian(this.positionInSegment);
this.positionInSegment += 2;
return v;
}
else if (this.positionInSegment == this.limitInSegment) {
advance();
return readChar();
}
else {
return (char) ((readUnsignedByte() << 8) | readUnsignedByte());
}
}
@Override
public int readInt() throws IOException {
if (this.positionInSegment < this.limitInSegment - 3) {
final int v = this.currentSegment.getIntBigEndian(this.positionInSegment);
this.positionInSegment += 4;
return v;
}
else if (this.positionInSegment == this.limitInSegment) {
advance();
return readInt();
}
else {
return (readUnsignedByte() << 24) |
(readUnsignedByte() << 16) |
(readUnsignedByte() << 8) |
readUnsignedByte();
}
}
@Override
public long readLong() throws IOException {
if (this.positionInSegment < this.limitInSegment - 7) {
final long v = this.currentSegment.getLongBigEndian(this.positionInSegment);
this.positionInSegment += 8;
return v;
}
else if (this.positionInSegment == this.limitInSegment) {
advance();
return readLong();
}
else {
long l = 0L;
l |= ((long) readUnsignedByte()) << 56;
l |= ((long) readUnsignedByte()) << 48;
l |= ((long) readUnsignedByte()) << 40;
l |= ((long) readUnsignedByte()) << 32;
l |= ((long) readUnsignedByte()) << 24;
l |= ((long) readUnsignedByte()) << 16;
l |= ((long) readUnsignedByte()) << 8;
l |= (long) readUnsignedByte();
return l;
}
}
@Override
public float readFloat() throws IOException {
return Float.intBitsToFloat(readInt());
}
@Override
public double readDouble() throws IOException {
return Double.longBitsToDouble(readLong());
}
@Override
public String readLine() throws IOException {
final StringBuilder bld = new StringBuilder(32);
try {
int b;
while ((b = readUnsignedByte()) != '\n') {
if (b != '\r') {
bld.append((char) b);
}
}
}
catch (EOFException eofex) {}
if (bld.length() == 0) {
return null;
}
// trim a trailing carriage return
int len = bld.length();
if (len > 0 && bld.charAt(len - 1) == '\r') {
bld.setLength(len - 1);
}
return bld.toString();
}
@Override
public String readUTF() throws IOException {
final int utflen = readUnsignedShort();
final byte[] bytearr;
final char[] chararr;
if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
bytearr = new byte[utflen];
this.utfByteBuffer = bytearr;
} else {
bytearr = this.utfByteBuffer;
}
if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
chararr = new char[utflen];
this.utfCharBuffer = chararr;
} else {
chararr = this.utfCharBuffer;
}
int c, char2, char3;
int count = 0;
int chararrCount = 0;
readFully(bytearr, 0, utflen);
while (count < utflen) {
c = (int) bytearr[count] & 0xff;
if (c > 127) {
break;
}
count++;
chararr[chararrCount++] = (char) c;
}
while (count < utflen) {
c = (int) bytearr[count] & 0xff;
switch (c >> 4) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
/* 0xxxxxxx */
count++;
chararr[chararrCount++] = (char) c;
break;
case 12:
case 13:
/* 110x xxxx 10xx xxxx */
count += 2;
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = (int) bytearr[count - 1];
if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException("malformed input around byte " + count);
}
chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
if (count > utflen) {
throw new UTFDataFormatException("malformed input: partial character at end");
}
char2 = (int) bytearr[count - 2];
char3 = (int) bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException("malformed input around byte " + (count - 1));
}
chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
break;
default:
/* 10xx xxxx, 1111 xxxx */
throw new UTFDataFormatException("malformed input around byte " + count);
}
}
// The number of chars produced may be less than utflen
return new String(chararr, 0, chararrCount);
}
@Override
public int skipBytes(int n) throws IOException {
if (n < 0) {
throw new IllegalArgumentException();
}
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= n) {
this.positionInSegment += n;
return n;
}
else {
if (remaining == 0) {
try {
advance();
} catch (EOFException eofex) {
return 0;
}
remaining = this.limitInSegment - this.positionInSegment;
}
int skipped = 0;
while (true) {
int toSkip = Math.min(remaining, n);
n -= toSkip;
skipped += toSkip;
if (n > 0) {
try {
advance();
} catch (EOFException eofex) {
return skipped;
}
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += toSkip;
break;
}
}
return skipped;
}
}
@Override
public void skipBytesToRead(int numBytes) throws IOException {
if (numBytes < 0) {
throw new IllegalArgumentException();
}
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= numBytes) {
this.positionInSegment += numBytes;
}
else {
if (remaining == 0) {
advance();
remaining = this.limitInSegment - this.positionInSegment;
}
while (true) {
if (numBytes > remaining) {
numBytes -= remaining;
advance();
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += numBytes;
break;
}
}
}
}
public int getHeaderLength() {
return headerLength;
}
/**
* Reads up to {@code len} bytes of memory and stores it into {@code segment} starting at offset {@code off}.
* It returns the number of read bytes or -1 if there is no more data left.
*
* @param segment memory segment to store the data to.
* @param off offset into memory segment.
* @param len byte length to read.
* @return the number of actually read bytes of -1 if there is no more data left.
* @throws IOException if an I/O error occurs.
*/
public int read(MemorySegment segment, int off, int len) throws IOException{
int remaining = this.limitInSegment - this.positionInSegment;
if (remaining >= len) {
this.currentSegment.copyTo(this.positionInSegment, segment, off, len);
this.positionInSegment += len;
return len;
}
else {
if (remaining == 0) {
try {
advance();
}
catch (EOFException eof) {
return -1;
}
remaining = this.limitInSegment - this.positionInSegment;
}
int bytesRead = 0;
while (true) {
int toRead = Math.min(remaining, len - bytesRead);
this.currentSegment.copyTo(this.positionInSegment, segment, off, toRead);
off += toRead;
bytesRead += toRead;
if (len > bytesRead) {
try {
advance();
}
catch (EOFException eof) {
this.positionInSegment += toRead;
return bytesRead;
}
remaining = this.limitInSegment - this.positionInSegment;
}
else {
this.positionInSegment += toRead;
break;
}
}
return len;
}
}
}