| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.memory; |
| |
| import org.apache.flink.core.memory.DataInputView; |
| import org.apache.flink.core.memory.MemorySegment; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.UTFDataFormatException; |
| |
| |
| /** |
| * The base class for all input views that are backed by multiple memory pages. This base class contains all |
| * decoding methods to read data from a page and detect page boundary crossing. The concrete sub classes must |
| * implement the methods to provide the next memory page once the boundary is crossed. |
| */ |
| public abstract class AbstractPagedInputView implements DataInputView { |
| |
| private MemorySegment currentSegment; |
| |
| protected final int headerLength; // the number of bytes to skip at the beginning of each segment |
| |
| private int positionInSegment; // the offset in the current segment |
| |
| private int limitInSegment; // the limit in the current segment before switching to the next |
| |
| private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding |
| private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding |
| |
| |
| // -------------------------------------------------------------------------------------------- |
| // Constructors |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Creates a new view that starts with the given segment. The input starts directly after the header |
| * of the given page. If the header size is zero, it starts at the beginning. The specified initial |
| * limit describes up to which position data may be read from the current segment, before the view must |
| * advance to the next segment. |
| * |
| * @param initialSegment The memory segment to start reading from. |
| * @param initialLimit The position one after the last valid byte in the initial segment. |
| * @param headerLength The number of bytes to skip at the beginning of each segment for the header. This |
| * length must be the same for all memory segments. |
| */ |
| protected AbstractPagedInputView(MemorySegment initialSegment, int initialLimit, int headerLength) { |
| this.headerLength = headerLength; |
| this.positionInSegment = headerLength; |
| seekInput(initialSegment, headerLength, initialLimit); |
| } |
| |
| /** |
| * Creates a new view that is initially not bound to a memory segment. This constructor is typically |
| * for views that always seek first. |
| * |
| * <p>WARNING: The view is not readable until the first call to either {@link #advance()}, |
| * or to {@link #seekInput(MemorySegment, int, int)}. |
| * |
| * @param headerLength The number of bytes to skip at the beginning of each segment for the header. |
| */ |
| protected AbstractPagedInputView(int headerLength) { |
| this.headerLength = headerLength; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Page Management |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Gets the memory segment that will be used to read the next bytes from. If the segment is exactly exhausted, |
| * meaning that the last byte read was the last byte available in the segment, then this segment will |
| * not serve the next bytes. The segment to serve the next bytes will be obtained through the |
| * {@link #nextSegment(MemorySegment)} method. |
| * |
| * @return The current memory segment. |
| */ |
| public MemorySegment getCurrentSegment() { |
| return this.currentSegment; |
| } |
| |
| /** |
| * Gets the position from which the next byte will be read. If that position is equal to the current limit, |
| * then the next byte will be read from next segment. |
| * |
| * @return The position from which the next byte will be read. |
| * @see #getCurrentSegmentLimit() |
| */ |
| public int getCurrentPositionInSegment() { |
| return this.positionInSegment; |
| } |
| |
| /** |
| * Gets the current limit in the memory segment. This value points to the byte one after the last valid byte |
| * in the memory segment. |
| * |
| * @return The current limit in the memory segment. |
| * @see #getCurrentPositionInSegment() |
| */ |
| public int getCurrentSegmentLimit() { |
| return this.limitInSegment; |
| } |
| |
| public void updateCurrentSegmentLimit() { |
| this.limitInSegment = getLimitForSegment(this.currentSegment); |
| } |
| |
| /** |
| * The method by which concrete subclasses realize page crossing. This method is invoked when the current page |
| * is exhausted and a new page is required to continue the reading. If no further page is available, this |
| * method must throw an {@link EOFException}. |
| * |
| * @param current The current page that was read to its limit. May be {@code null}, if this method is |
| * invoked for the first time. |
| * @return The next page from which the reading should continue. May not be {@code null}. If the input is |
| * exhausted, an {@link EOFException} must be thrown instead. |
| * |
| * @throws EOFException Thrown, if no further segment is available. |
| * @throws IOException Thrown, if the method cannot provide the next page due to an I/O related problem. |
| */ |
| protected abstract MemorySegment nextSegment(MemorySegment current) throws EOFException, IOException; |
| |
| /** |
| * Gets the limit for reading bytes from the given memory segment. This method must return the position |
| * of the byte after the last valid byte in the given memory segment. When the position returned by this |
| * method is reached, the view will attempt to switch to the next memory segment. |
| * |
| * @param segment The segment to determine the limit for. |
| * @return The limit for the given memory segment. |
| */ |
| protected abstract int getLimitForSegment(MemorySegment segment); |
| |
| /** |
| * Advances the view to the next memory segment. The reading will continue after the header of the next |
| * segment. This method uses {@link #nextSegment(MemorySegment)} and {@link #getLimitForSegment(MemorySegment)} |
| * to get the next segment and set its limit. |
| * |
| * @throws IOException Thrown, if the next segment could not be obtained. |
| * |
| * @see #nextSegment(MemorySegment) |
| * @see #getLimitForSegment(MemorySegment) |
| */ |
| public final void advance() throws IOException { |
| // note: this code ensures that in case of EOF, we stay at the same position such that |
| // EOF is reproducible (if nextSegment throws a reproducible EOFException) |
| this.currentSegment = nextSegment(this.currentSegment); |
| this.limitInSegment = getLimitForSegment(this.currentSegment); |
| this.positionInSegment = this.headerLength; |
| afterAdvance(); |
| } |
| |
| /** |
| * This method will run after advance(). Override it if you want to use it. |
| */ |
| protected void afterAdvance() {} |
| |
| /** |
| * Sets the internal state of the view such that the next bytes will be read from the given memory segment, |
| * starting at the given position. The memory segment will provide bytes up to the given limit position. |
| * |
| * @param segment The segment to read the next bytes from. |
| * @param positionInSegment The position in the segment to start reading from. |
| * @param limitInSegment The limit in the segment. When reached, the view will attempt to switch to |
| * the next segment. |
| */ |
| protected void seekInput(MemorySegment segment, int positionInSegment, int limitInSegment) { |
| this.currentSegment = segment; |
| this.positionInSegment = positionInSegment; |
| this.limitInSegment = limitInSegment; |
| } |
| |
| /** |
| * Clears the internal state of the view. After this call, all read attempts will fail, until the |
| * {@link #advance()} or {@link #seekInput(MemorySegment, int, int)} method have been invoked. |
| */ |
| protected void clear() { |
| this.currentSegment = null; |
| this.positionInSegment = this.headerLength; |
| this.limitInSegment = headerLength; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Data Input Specific methods |
| // -------------------------------------------------------------------------------------------- |
| |
| @Override |
| public int read(byte[] b) throws IOException{ |
| return read(b, 0, b.length); |
| } |
| |
| @Override |
| public int read(byte[] b, int off, int len) throws IOException{ |
| if (off < 0 || len < 0 || off + len > b.length) { |
| throw new IndexOutOfBoundsException(); |
| } |
| |
| int remaining = this.limitInSegment - this.positionInSegment; |
| if (remaining >= len) { |
| this.currentSegment.get(this.positionInSegment, b, off, len); |
| this.positionInSegment += len; |
| return len; |
| } |
| else { |
| if (remaining == 0) { |
| try { |
| advance(); |
| } |
| catch (EOFException eof) { |
| return -1; |
| } |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| |
| int bytesRead = 0; |
| while (true) { |
| int toRead = Math.min(remaining, len - bytesRead); |
| this.currentSegment.get(this.positionInSegment, b, off, toRead); |
| off += toRead; |
| bytesRead += toRead; |
| |
| if (len > bytesRead) { |
| try { |
| advance(); |
| } |
| catch (EOFException eof) { |
| this.positionInSegment += toRead; |
| return bytesRead; |
| } |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| else { |
| this.positionInSegment += toRead; |
| break; |
| } |
| } |
| return len; |
| } |
| } |
| |
| @Override |
| public void readFully(byte[] b) throws IOException { |
| readFully(b, 0, b.length); |
| } |
| |
| @Override |
| public void readFully(byte[] b, int off, int len) throws IOException { |
| int bytesRead = read(b, off, len); |
| |
| if (bytesRead < len){ |
| throw new EOFException("There is no enough data left in the DataInputView."); |
| } |
| } |
| |
| @Override |
| public boolean readBoolean() throws IOException { |
| return readByte() == 1; |
| } |
| |
| @Override |
| public byte readByte() throws IOException { |
| if (this.positionInSegment < this.limitInSegment) { |
| return this.currentSegment.get(this.positionInSegment++); |
| } |
| else { |
| advance(); |
| return readByte(); |
| } |
| } |
| |
| @Override |
| public int readUnsignedByte() throws IOException { |
| return readByte() & 0xff; |
| } |
| |
| @Override |
| public short readShort() throws IOException { |
| if (this.positionInSegment < this.limitInSegment - 1) { |
| final short v = this.currentSegment.getShortBigEndian(this.positionInSegment); |
| this.positionInSegment += 2; |
| return v; |
| } |
| else if (this.positionInSegment == this.limitInSegment) { |
| advance(); |
| return readShort(); |
| } |
| else { |
| return (short) ((readUnsignedByte() << 8) | readUnsignedByte()); |
| } |
| } |
| |
| @Override |
| public int readUnsignedShort() throws IOException { |
| if (this.positionInSegment < this.limitInSegment - 1) { |
| final int v = this.currentSegment.getShortBigEndian(this.positionInSegment) & 0xffff; |
| this.positionInSegment += 2; |
| return v; |
| } |
| else if (this.positionInSegment == this.limitInSegment) { |
| advance(); |
| return readUnsignedShort(); |
| } |
| else { |
| return (readUnsignedByte() << 8) | readUnsignedByte(); |
| } |
| } |
| |
| @Override |
| public char readChar() throws IOException { |
| if (this.positionInSegment < this.limitInSegment - 1) { |
| final char v = this.currentSegment.getCharBigEndian(this.positionInSegment); |
| this.positionInSegment += 2; |
| return v; |
| } |
| else if (this.positionInSegment == this.limitInSegment) { |
| advance(); |
| return readChar(); |
| } |
| else { |
| return (char) ((readUnsignedByte() << 8) | readUnsignedByte()); |
| } |
| } |
| |
| @Override |
| public int readInt() throws IOException { |
| if (this.positionInSegment < this.limitInSegment - 3) { |
| final int v = this.currentSegment.getIntBigEndian(this.positionInSegment); |
| this.positionInSegment += 4; |
| return v; |
| } |
| else if (this.positionInSegment == this.limitInSegment) { |
| advance(); |
| return readInt(); |
| } |
| else { |
| return (readUnsignedByte() << 24) | |
| (readUnsignedByte() << 16) | |
| (readUnsignedByte() << 8) | |
| readUnsignedByte(); |
| } |
| } |
| |
| @Override |
| public long readLong() throws IOException { |
| if (this.positionInSegment < this.limitInSegment - 7) { |
| final long v = this.currentSegment.getLongBigEndian(this.positionInSegment); |
| this.positionInSegment += 8; |
| return v; |
| } |
| else if (this.positionInSegment == this.limitInSegment) { |
| advance(); |
| return readLong(); |
| } |
| else { |
| long l = 0L; |
| l |= ((long) readUnsignedByte()) << 56; |
| l |= ((long) readUnsignedByte()) << 48; |
| l |= ((long) readUnsignedByte()) << 40; |
| l |= ((long) readUnsignedByte()) << 32; |
| l |= ((long) readUnsignedByte()) << 24; |
| l |= ((long) readUnsignedByte()) << 16; |
| l |= ((long) readUnsignedByte()) << 8; |
| l |= (long) readUnsignedByte(); |
| return l; |
| } |
| } |
| |
| @Override |
| public float readFloat() throws IOException { |
| return Float.intBitsToFloat(readInt()); |
| } |
| |
| @Override |
| public double readDouble() throws IOException { |
| return Double.longBitsToDouble(readLong()); |
| } |
| |
| @Override |
| public String readLine() throws IOException { |
| final StringBuilder bld = new StringBuilder(32); |
| |
| try { |
| int b; |
| while ((b = readUnsignedByte()) != '\n') { |
| if (b != '\r') { |
| bld.append((char) b); |
| } |
| } |
| } |
| catch (EOFException eofex) {} |
| |
| if (bld.length() == 0) { |
| return null; |
| } |
| |
| // trim a trailing carriage return |
| int len = bld.length(); |
| if (len > 0 && bld.charAt(len - 1) == '\r') { |
| bld.setLength(len - 1); |
| } |
| return bld.toString(); |
| } |
| |
| @Override |
| public String readUTF() throws IOException { |
| final int utflen = readUnsignedShort(); |
| |
| final byte[] bytearr; |
| final char[] chararr; |
| |
| if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) { |
| bytearr = new byte[utflen]; |
| this.utfByteBuffer = bytearr; |
| } else { |
| bytearr = this.utfByteBuffer; |
| } |
| if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) { |
| chararr = new char[utflen]; |
| this.utfCharBuffer = chararr; |
| } else { |
| chararr = this.utfCharBuffer; |
| } |
| |
| int c, char2, char3; |
| int count = 0; |
| int chararrCount = 0; |
| |
| readFully(bytearr, 0, utflen); |
| |
| while (count < utflen) { |
| c = (int) bytearr[count] & 0xff; |
| if (c > 127) { |
| break; |
| } |
| count++; |
| chararr[chararrCount++] = (char) c; |
| } |
| |
| while (count < utflen) { |
| c = (int) bytearr[count] & 0xff; |
| switch (c >> 4) { |
| case 0: |
| case 1: |
| case 2: |
| case 3: |
| case 4: |
| case 5: |
| case 6: |
| case 7: |
| /* 0xxxxxxx */ |
| count++; |
| chararr[chararrCount++] = (char) c; |
| break; |
| case 12: |
| case 13: |
| /* 110x xxxx 10xx xxxx */ |
| count += 2; |
| if (count > utflen) { |
| throw new UTFDataFormatException("malformed input: partial character at end"); |
| } |
| char2 = (int) bytearr[count - 1]; |
| if ((char2 & 0xC0) != 0x80) { |
| throw new UTFDataFormatException("malformed input around byte " + count); |
| } |
| chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); |
| break; |
| case 14: |
| /* 1110 xxxx 10xx xxxx 10xx xxxx */ |
| count += 3; |
| if (count > utflen) { |
| throw new UTFDataFormatException("malformed input: partial character at end"); |
| } |
| char2 = (int) bytearr[count - 2]; |
| char3 = (int) bytearr[count - 1]; |
| if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { |
| throw new UTFDataFormatException("malformed input around byte " + (count - 1)); |
| } |
| chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)); |
| break; |
| default: |
| /* 10xx xxxx, 1111 xxxx */ |
| throw new UTFDataFormatException("malformed input around byte " + count); |
| } |
| } |
| // The number of chars produced may be less than utflen |
| return new String(chararr, 0, chararrCount); |
| } |
| |
| @Override |
| public int skipBytes(int n) throws IOException { |
| if (n < 0) { |
| throw new IllegalArgumentException(); |
| } |
| |
| int remaining = this.limitInSegment - this.positionInSegment; |
| if (remaining >= n) { |
| this.positionInSegment += n; |
| return n; |
| } |
| else { |
| if (remaining == 0) { |
| try { |
| advance(); |
| } catch (EOFException eofex) { |
| return 0; |
| } |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| |
| int skipped = 0; |
| while (true) { |
| int toSkip = Math.min(remaining, n); |
| n -= toSkip; |
| skipped += toSkip; |
| |
| if (n > 0) { |
| try { |
| advance(); |
| } catch (EOFException eofex) { |
| return skipped; |
| } |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| else { |
| this.positionInSegment += toSkip; |
| break; |
| } |
| } |
| return skipped; |
| } |
| } |
| |
| @Override |
| public void skipBytesToRead(int numBytes) throws IOException { |
| if (numBytes < 0) { |
| throw new IllegalArgumentException(); |
| } |
| |
| int remaining = this.limitInSegment - this.positionInSegment; |
| if (remaining >= numBytes) { |
| this.positionInSegment += numBytes; |
| } |
| else { |
| if (remaining == 0) { |
| advance(); |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| |
| while (true) { |
| if (numBytes > remaining) { |
| numBytes -= remaining; |
| advance(); |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| else { |
| this.positionInSegment += numBytes; |
| break; |
| } |
| } |
| } |
| } |
| |
| public int getHeaderLength() { |
| return headerLength; |
| } |
| |
| /** |
| * Reads up to {@code len} bytes of memory and stores it into {@code segment} starting at offset {@code off}. |
| * It returns the number of read bytes or -1 if there is no more data left. |
| * |
| * @param segment memory segment to store the data to. |
| * @param off offset into memory segment. |
| * @param len byte length to read. |
| * @return the number of actually read bytes of -1 if there is no more data left. |
| * @throws IOException if an I/O error occurs. |
| */ |
| public int read(MemorySegment segment, int off, int len) throws IOException{ |
| int remaining = this.limitInSegment - this.positionInSegment; |
| if (remaining >= len) { |
| this.currentSegment.copyTo(this.positionInSegment, segment, off, len); |
| this.positionInSegment += len; |
| return len; |
| } |
| else { |
| if (remaining == 0) { |
| try { |
| advance(); |
| } |
| catch (EOFException eof) { |
| return -1; |
| } |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| |
| int bytesRead = 0; |
| while (true) { |
| int toRead = Math.min(remaining, len - bytesRead); |
| this.currentSegment.copyTo(this.positionInSegment, segment, off, toRead); |
| off += toRead; |
| bytesRead += toRead; |
| |
| if (len > bytesRead) { |
| try { |
| advance(); |
| } |
| catch (EOFException eof) { |
| this.positionInSegment += toRead; |
| return bytesRead; |
| } |
| remaining = this.limitInSegment - this.positionInSegment; |
| } |
| else { |
| this.positionInSegment += toRead; |
| break; |
| } |
| } |
| return len; |
| } |
| } |
| } |