blob: b3c8cdaeb29068668422c4fa31a19e3229c5c294 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util.io;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
/**
* Data input based on {@code Unsafe} operations.
*/
public class GridUnsafeDataInput extends InputStream implements GridDataInput {
/** */
private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK);
/** Maximum data block length. */
private static final int MAX_BLOCK_SIZE = 1024;
/** Length of char buffer (for reading strings). */
private static final int CHAR_BUF_SIZE = 256;
/** Buffer for reading general/block data. */
@GridToStringExclude
private final byte[] utfBuf = new byte[MAX_BLOCK_SIZE];
/** Char buffer for fast string reads. */
@GridToStringExclude
private final char[] urfCBuf = new char[CHAR_BUF_SIZE];
/** Current offset into buf. */
private int pos;
/** End offset of valid data in buf, or -1 if no more block data. */
private int end = -1;
/** Bytes. */
@GridToStringExclude
private byte[] buf;
/** Offset. */
private int off;
/** Max. */
private int max;
/** Underlying input stream. */
@GridToStringExclude
private InputStream in;
/** Buffer for reading from stream. */
@GridToStringExclude
private byte[] inBuf = new byte[1024];
/** Maximum message size. */
private int maxOff;
/** Last length check timestamp. */
private long lastCheck;
/** {@inheritDoc} */
@Override public void bytes(byte[] bytes, int len) {
bytes(bytes, 0, len);
}
/**
* @param bytes Bytes.
* @param off Offset.
* @param len Length.
*/
public void bytes(byte[] bytes, int off, int len) {
buf = bytes;
max = len;
this.off = off;
}
/** {@inheritDoc} */
@Override public void inputStream(InputStream in) throws IOException {
this.in = in;
buf = inBuf;
}
/**
* Reads from stream to buffer. If stream is {@code null}, this method is no-op.
*
* @param size Number of bytes to read.
* @throws IOException In case of error.
*/
private void fromStream(int size) throws IOException {
if (in == null)
return;
maxOff = Math.max(maxOff, size);
long now = U.currentTimeMillis();
// Increase size of buffer if needed.
if (size > inBuf.length)
buf = inBuf = new byte[Math.max(inBuf.length << 1, size)]; // Grow.
else if (now - lastCheck > CHECK_FREQ) {
int halfSize = inBuf.length >> 1;
if (maxOff < halfSize) {
byte[] newInBuf = new byte[halfSize]; // Shrink.
System.arraycopy(inBuf, 0, newInBuf, 0, off);
buf = inBuf = newInBuf;
}
maxOff = 0;
lastCheck = now;
}
off = 0;
max = 0;
while (max != size) {
int read = in.read(inBuf, max, size - max);
if (read == -1)
throw new EOFException("End of stream reached: " + in);
max += read;
}
}
/**
* @param more Bytes to move forward.
* @return Old offset value.
* @throws IOException In case of error.
*/
private int offset(int more) throws IOException {
int old = off;
off += more;
if (off > max)
throw new EOFException("Attempt to read beyond the end of the stream " +
"[pos=" + off + ", more=" + more + ", max=" + max + ']');
return old;
}
/** {@inheritDoc} */
@SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
@Override public void reset() throws IOException {
in = null;
off = 0;
max = 0;
}
/** {@inheritDoc} */
@Override public byte[] readByteArray() throws IOException {
int arrSize = readInt();
fromStream(arrSize);
byte[] arr = new byte[arrSize];
System.arraycopy(buf, offset(arrSize), arr, 0, arrSize);
return arr;
}
/** {@inheritDoc} */
@Override public short[] readShortArray() throws IOException {
int arrSize = readInt();
int bytesToCp = arrSize << 1;
fromStream(bytesToCp);
short[] arr = new short[arrSize];
long off = BYTE_ARR_OFF + offset(bytesToCp);
if (BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getShortLE(buf, off);
off += 2;
}
}
else
GridUnsafe.copyMemory(buf, off, arr, SHORT_ARR_OFF, bytesToCp);
return arr;
}
/** {@inheritDoc} */
@Override public int[] readIntArray() throws IOException {
int arrSize = readInt();
int bytesToCp = arrSize << 2;
fromStream(bytesToCp);
int[] arr = new int[arrSize];
long off = BYTE_ARR_OFF + offset(bytesToCp);
if (BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getIntLE(buf, off);
off += 4;
}
}
else
GridUnsafe.copyMemory(buf, off, arr, INT_ARR_OFF, bytesToCp);
return arr;
}
/** {@inheritDoc} */
@Override public double[] readDoubleArray() throws IOException {
int arrSize = readInt();
int bytesToCp = arrSize << 3;
fromStream(bytesToCp);
double[] arr = new double[arrSize];
long off = BYTE_ARR_OFF + offset(bytesToCp);
if (BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getDoubleLE(buf, off);
off += 8;
}
}
else
GridUnsafe.copyMemory(buf, off, arr, DOUBLE_ARR_OFF, bytesToCp);
return arr;
}
/** {@inheritDoc} */
@Override public boolean[] readBooleanArray() throws IOException {
int arrSize = readInt();
boolean[] vals = new boolean[arrSize];
for (int i = 0; i < arrSize; i++)
vals[i] = readBoolean();
return vals;
}
/** {@inheritDoc} */
@Override public char[] readCharArray() throws IOException {
int arrSize = readInt();
int bytesToCp = arrSize << 1;
fromStream(bytesToCp);
char[] arr = new char[arrSize];
long off = BYTE_ARR_OFF + offset(bytesToCp);
if (BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getCharLE(buf, off);
off += 2;
}
}
else
GridUnsafe.copyMemory(buf, off, arr, CHAR_ARR_OFF, bytesToCp);
return arr;
}
/** {@inheritDoc} */
@Override public long[] readLongArray() throws IOException {
int arrSize = readInt();
int bytesToCp = arrSize << 3;
fromStream(bytesToCp);
long[] arr = new long[arrSize];
long off = BYTE_ARR_OFF + offset(bytesToCp);
if (BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getLongLE(buf, off);
off += 8;
}
}
else
GridUnsafe.copyMemory(buf, off, arr, LONG_ARR_OFF, bytesToCp);
return arr;
}
/** {@inheritDoc} */
@Override public float[] readFloatArray() throws IOException {
int arrSize = readInt();
int bytesToCp = arrSize << 2;
fromStream(bytesToCp);
float[] arr = new float[arrSize];
long off = BYTE_ARR_OFF + offset(bytesToCp);
if (BIG_ENDIAN) {
for (int i = 0; i < arr.length; i++) {
arr[i] = GridUnsafe.getFloatLE(buf, off);
off += 4;
}
}
else
GridUnsafe.copyMemory(buf, off, arr, FLOAT_ARR_OFF, bytesToCp);
return arr;
}
/** {@inheritDoc} */
@Override public void readFully(byte[] b) throws IOException {
int len = b.length;
fromStream(len);
System.arraycopy(buf, offset(len), b, 0, len);
}
/** {@inheritDoc} */
@Override public void readFully(byte[] b, int off, int len) throws IOException {
fromStream(len);
System.arraycopy(buf, offset(len), b, off, len);
}
/** {@inheritDoc} */
@Override public int skipBytes(int n) throws IOException {
if (off + n > max)
n = max - off;
off += n;
return n;
}
/** {@inheritDoc} */
@Override public boolean readBoolean() throws IOException {
fromStream(1);
return GridUnsafe.getBoolean(buf, BYTE_ARR_OFF + offset(1));
}
/** {@inheritDoc} */
@Override public byte readByte() throws IOException {
fromStream(1);
return GridUnsafe.getByte(buf, BYTE_ARR_OFF + offset(1));
}
/** {@inheritDoc} */
@Override public int readUnsignedByte() throws IOException {
return readByte() & 0xff;
}
/** {@inheritDoc} */
@Override public short readShort() throws IOException {
fromStream(2);
long off = BYTE_ARR_OFF + offset(2);
return BIG_ENDIAN ? GridUnsafe.getShortLE(buf, off) : GridUnsafe.getShort(buf, off);
}
/** {@inheritDoc} */
@Override public int readUnsignedShort() throws IOException {
return readShort() & 0xffff;
}
/** {@inheritDoc} */
@Override public char readChar() throws IOException {
fromStream(2);
long off = BYTE_ARR_OFF + this.off;
char v = BIG_ENDIAN ? GridUnsafe.getCharLE(buf, off) : GridUnsafe.getChar(buf, off);
offset(2);
return v;
}
/** {@inheritDoc} */
@Override public int readInt() throws IOException {
fromStream(4);
long off = BYTE_ARR_OFF + offset(4);
return BIG_ENDIAN ? GridUnsafe.getIntLE(buf, off) : GridUnsafe.getInt(buf, off);
}
/** {@inheritDoc} */
@Override public long readLong() throws IOException {
fromStream(8);
long off = BYTE_ARR_OFF + offset(8);
return BIG_ENDIAN ? GridUnsafe.getLongLE(buf, off) : GridUnsafe.getLong(buf, off);
}
/** {@inheritDoc} */
@Override public float readFloat() throws IOException {
int v = readInt();
return Float.intBitsToFloat(v);
}
/** {@inheritDoc} */
@Override public double readDouble() throws IOException {
long v = readLong();
return Double.longBitsToDouble(v);
}
/** {@inheritDoc} */
@Override public int read() throws IOException {
try {
return readUnsignedByte();
}
catch (EOFException ignored) {
return -1;
}
}
/** {@inheritDoc} */
@Override public int read(byte b[], int off, int len) throws IOException {
if (b == null)
throw new NullPointerException();
if (off < 0 || len < 0 || len > b.length - off)
throw new IndexOutOfBoundsException();
if (len == 0)
return 0;
if (in != null)
return in.read(b, off, len);
else {
int toRead = Math.min(len, max - this.off);
System.arraycopy(buf, offset(toRead), b, off, toRead);
return toRead;
}
}
/** {@inheritDoc} */
@Override public String readLine() throws IOException {
SB sb = new SB();
int b;
while ((b = read()) >= 0) {
char c = (char)b;
switch (c) {
case '\n':
return sb.toString();
case '\r':
b = read();
if (b < 0 || b == '\n')
return sb.toString();
else
sb.a((char)b);
break;
default:
sb.a(c);
}
}
return sb.toString();
}
/** {@inheritDoc} */
@Override public String readUTF() throws IOException {
return readUTFBody(readInt());
}
/**
* Reads in the "body" (i.e., the UTF representation minus the 2-byte
* or 8-byte length header) of a UTF encoding, which occupies the next
* utfLen bytes.
*
* @param utfLen UTF encoding length.
* @return String.
* @throws IOException In case of error.
*/
private String readUTFBody(long utfLen) throws IOException {
StringBuilder sbuf = new StringBuilder();
end = pos = 0;
while (utfLen > 0) {
int avail = end - pos;
if (avail >= 3 || (long)avail == utfLen)
utfLen -= readUTFSpan(sbuf, utfLen);
else {
// shift and refill buffer manually
if (avail > 0)
System.arraycopy(utfBuf, pos, utfBuf, 0, avail);
pos = 0;
end = (int)Math.min(MAX_BLOCK_SIZE, utfLen);
readFully(utfBuf, avail, end - avail);
}
}
return sbuf.toString();
}
/**
* Reads span of UTF-encoded characters out of internal buffer
* (starting at offset pos and ending at or before offset end),
* consuming no more than utfLen bytes. Appends read characters to
* sbuf. Returns the number of bytes consumed.
*
* @param sbuf String builder.
* @param utfLen UTF encoding length.
* @return Number of bytes consumed.
* @throws IOException In case of error.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
private long readUTFSpan(StringBuilder sbuf, long utfLen) throws IOException {
int cpos = 0;
int start = pos;
int avail = Math.min(end - pos, CHAR_BUF_SIZE);
int stop = pos + ((utfLen > avail) ? avail - 2 : (int)utfLen);
boolean outOfBounds = false;
try {
while (pos < stop) {
int b1 = utfBuf[pos++] & 0xFF;
int b2, b3;
switch (b1 >> 4) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
// 1 byte format: 0xxxxxxx
urfCBuf[cpos++] = (char)b1;
break;
case 12:
case 13:
// 2 byte format: 110xxxxx 10xxxxxx
b2 = utfBuf[pos++];
if ((b2 & 0xC0) != 0x80)
throw new UTFDataFormatException();
urfCBuf[cpos++] = (char)(((b1 & 0x1F) << 6) | (b2 & 0x3F));
break;
case 14:
// 3 byte format: 1110xxxx 10xxxxxx 10xxxxxx
b3 = utfBuf[pos + 1];
b2 = utfBuf[pos];
pos += 2;
if ((b2 & 0xC0) != 0x80 || (b3 & 0xC0) != 0x80)
throw new UTFDataFormatException();
urfCBuf[cpos++] = (char)(((b1 & 0x0F) << 12) | ((b2 & 0x3F) << 6) | (b3 & 0x3F));
break;
default:
// 10xx xxxx, 1111 xxxx
throw new UTFDataFormatException();
}
}
}
catch (ArrayIndexOutOfBoundsException ignored) {
outOfBounds = true;
}
finally {
if (outOfBounds || (pos - start) > utfLen) {
pos = start + (int)utfLen;
throw new UTFDataFormatException();
}
}
sbuf.append(urfCBuf, 0, cpos);
return pos - start;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridUnsafeDataInput.class, this);
}
}