blob: e36528d59eb19a54f7c44dbb712451012a83ca0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.memory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import java.io.IOException;
import java.io.UTFDataFormatException;
/**
* The base class for all output views that are backed by multiple memory pages. This base class contains all
* encoding methods to write data to a page and detect page boundary crossing. The concrete sub classes must
* implement the methods to collect the current page and provide the next memory page once the boundary is crossed.
*
* <p>The paging assumes that all memory segments are of the same size.
*/
public abstract class AbstractPagedOutputView implements DataOutputView {
private MemorySegment currentSegment; // the current memory segment to write to
protected final int segmentSize; // the size of the memory segments
protected final int headerLength; // the number of bytes to skip at the beginning of each segment
private int positionInSegment; // the offset in the current segment
private byte[] utfBuffer; // the reusable array for UTF encodings
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
/**
* Creates a new output view that writes initially to the given initial segment. All segments in the
* view have to be of the given {@code segmentSize}. A header of length {@code headerLength} is left
* at the beginning of each segment.
*
* @param initialSegment The segment that the view starts writing to.
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
*/
protected AbstractPagedOutputView(MemorySegment initialSegment, int segmentSize, int headerLength) {
if (initialSegment == null) {
throw new NullPointerException("Initial Segment may not be null");
}
this.segmentSize = segmentSize;
this.headerLength = headerLength;
this.currentSegment = initialSegment;
this.positionInSegment = headerLength;
}
/**
* @param segmentSize The size of the memory segments.
* @param headerLength The number of bytes to skip at the beginning of each segment for the header.
*/
protected AbstractPagedOutputView(int segmentSize, int headerLength) {
this.segmentSize = segmentSize;
this.headerLength = headerLength;
}
// --------------------------------------------------------------------------------------------
// Page Management
// --------------------------------------------------------------------------------------------
/**
* This method must return a segment. If no more segments are available, it must throw an
* {@link java.io.EOFException}.
*
* @param current The current memory segment
* @param positionInCurrent The position in the segment, one after the last valid byte.
* @return The next memory segment.
*
* @throws IOException
*/
protected abstract MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException;
/**
* Gets the segment that the view currently writes to.
*
* @return The segment the view currently writes to.
*/
public MemorySegment getCurrentSegment() {
return this.currentSegment;
}
/**
* Gets the current write position (the position where the next bytes will be written)
* in the current memory segment.
*
* @return The current write offset in the current memory segment.
*/
public int getCurrentPositionInSegment() {
return this.positionInSegment;
}
/**
* Gets the size of the segments used by this view.
*
* @return The memory segment size.
*/
public int getSegmentSize() {
return this.segmentSize;
}
/**
* Moves the output view to the next page. This method invokes internally the
* {@link #nextSegment(MemorySegment, int)} method to give the current memory segment to the concrete subclass'
* implementation and obtain the next segment to write to. Writing will continue inside the new segment
* after the header.
*
* @throws IOException Thrown, if the current segment could not be processed or a new segment could not
* be obtained.
*/
public void advance() throws IOException {
this.currentSegment = nextSegment(this.currentSegment, this.positionInSegment);
this.positionInSegment = this.headerLength;
}
/**
* Sets the internal state to the given memory segment and the given position within the segment.
*
* @param seg The memory segment to write the next bytes to.
* @param position The position to start writing the next bytes to.
*/
protected void seekOutput(MemorySegment seg, int position) {
this.currentSegment = seg;
this.positionInSegment = position;
}
/**
* Clears the internal state. Any successive write calls will fail until either {@link #advance()} or
* {@link #seekOutput(MemorySegment, int)} is called.
*
* @see #advance()
* @see #seekOutput(MemorySegment, int)
*/
protected void clear() {
this.currentSegment = null;
this.positionInSegment = this.headerLength;
}
// --------------------------------------------------------------------------------------------
// Data Output Specific methods
// --------------------------------------------------------------------------------------------
@Override
public void write(int b) throws IOException {
writeByte(b);
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
int remaining = this.segmentSize - this.positionInSegment;
if (remaining >= len) {
this.currentSegment.put(this.positionInSegment, b, off, len);
this.positionInSegment += len;
}
else {
if (remaining == 0) {
advance();
remaining = this.segmentSize - this.positionInSegment;
}
while (true) {
int toPut = Math.min(remaining, len);
this.currentSegment.put(this.positionInSegment, b, off, toPut);
off += toPut;
len -= toPut;
if (len > 0) {
this.positionInSegment = this.segmentSize;
advance();
remaining = this.segmentSize - this.positionInSegment;
}
else {
this.positionInSegment += toPut;
break;
}
}
}
}
@Override
public void writeBoolean(boolean v) throws IOException {
writeByte(v ? 1 : 0);
}
@Override
public void writeByte(int v) throws IOException {
if (this.positionInSegment < this.segmentSize) {
this.currentSegment.put(this.positionInSegment++, (byte) v);
}
else {
advance();
writeByte(v);
}
}
@Override
public void writeShort(int v) throws IOException {
if (this.positionInSegment < this.segmentSize - 1) {
this.currentSegment.putShortBigEndian(this.positionInSegment, (short) v);
this.positionInSegment += 2;
}
else if (this.positionInSegment == this.segmentSize) {
advance();
writeShort(v);
}
else {
writeByte(v >> 8);
writeByte(v);
}
}
@Override
public void writeChar(int v) throws IOException {
if (this.positionInSegment < this.segmentSize - 1) {
this.currentSegment.putCharBigEndian(this.positionInSegment, (char) v);
this.positionInSegment += 2;
}
else if (this.positionInSegment == this.segmentSize) {
advance();
writeChar(v);
}
else {
writeByte(v >> 8);
writeByte(v);
}
}
@Override
public void writeInt(int v) throws IOException {
if (this.positionInSegment < this.segmentSize - 3) {
this.currentSegment.putIntBigEndian(this.positionInSegment, v);
this.positionInSegment += 4;
}
else if (this.positionInSegment == this.segmentSize) {
advance();
writeInt(v);
}
else {
writeByte(v >> 24);
writeByte(v >> 16);
writeByte(v >> 8);
writeByte(v);
}
}
@Override
public void writeLong(long v) throws IOException {
if (this.positionInSegment < this.segmentSize - 7) {
this.currentSegment.putLongBigEndian(this.positionInSegment, v);
this.positionInSegment += 8;
}
else if (this.positionInSegment == this.segmentSize) {
advance();
writeLong(v);
}
else {
writeByte((int) (v >> 56));
writeByte((int) (v >> 48));
writeByte((int) (v >> 40));
writeByte((int) (v >> 32));
writeByte((int) (v >> 24));
writeByte((int) (v >> 16));
writeByte((int) (v >> 8));
writeByte((int) v);
}
}
@Override
public void writeFloat(float v) throws IOException {
writeInt(Float.floatToRawIntBits(v));
}
@Override
public void writeDouble(double v) throws IOException {
writeLong(Double.doubleToRawLongBits(v));
}
@Override
public void writeBytes(String s) throws IOException {
for (int i = 0; i < s.length(); i++) {
writeByte(s.charAt(i));
}
}
@Override
public void writeChars(String s) throws IOException {
for (int i = 0; i < s.length(); i++) {
writeChar(s.charAt(i));
}
}
@Override
public void writeUTF(String str) throws IOException {
int strlen = str.length();
int utflen = 0;
int c, count = 0;
/* use charAt instead of copying String to char array */
for (int i = 0; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
} else {
utflen += 2;
}
}
if (utflen > 65535) {
throw new UTFDataFormatException("encoded string too long: " + utflen + " memory");
}
if (this.utfBuffer == null || this.utfBuffer.length < utflen + 2) {
this.utfBuffer = new byte[utflen + 2];
}
final byte[] bytearr = this.utfBuffer;
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) (utflen & 0xFF);
int i;
for (i = 0; i < strlen; i++) {
c = str.charAt(i);
if (!((c >= 0x0001) && (c <= 0x007F))) {
break;
}
bytearr[count++] = (byte) c;
}
for (; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
bytearr[count++] = (byte) c;
} else if (c > 0x07FF) {
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
} else {
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | (c & 0x3F));
}
}
write(bytearr, 0, utflen + 2);
}
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
while (numBytes > 0) {
final int remaining = this.segmentSize - this.positionInSegment;
if (numBytes <= remaining) {
this.positionInSegment += numBytes;
return;
}
this.positionInSegment = this.segmentSize;
advance();
numBytes -= remaining;
}
}
@Override
public void write(DataInputView source, int numBytes) throws IOException {
while (numBytes > 0) {
final int remaining = this.segmentSize - this.positionInSegment;
if (numBytes <= remaining) {
this.currentSegment.put(source, this.positionInSegment, numBytes);
this.positionInSegment += numBytes;
return;
}
if (remaining > 0) {
this.currentSegment.put(source, this.positionInSegment, remaining);
this.positionInSegment = this.segmentSize;
numBytes -= remaining;
}
advance();
}
}
public int getHeaderLength() {
return headerLength;
}
@Override
public void write(MemorySegment segment, int off, int len) throws IOException {
int remaining = this.segmentSize - this.positionInSegment;
if (remaining >= len) {
segment.copyTo(off, currentSegment, positionInSegment, len);
this.positionInSegment += len;
} else {
if (remaining == 0) {
advance();
remaining = this.segmentSize - this.positionInSegment;
}
while (true) {
int toPut = Math.min(remaining, len);
segment.copyTo(off, currentSegment, positionInSegment, toPut);
off += toPut;
len -= toPut;
if (len > 0) {
this.positionInSegment = this.segmentSize;
advance();
remaining = this.segmentSize - this.positionInSegment;
}
else {
this.positionInSegment += toPut;
break;
}
}
}
}
}