blob: eaaf8348840b539e8db35c3f15105a047f657653 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.network.internal.direct.stream;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
import java.util.UUID;
import org.apache.ignite.internal.util.ArrayFactory;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageReader;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageSerializer;
import org.apache.ignite.network.serialization.MessageWriter;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import static org.apache.ignite.internal.util.ArrayUtils.BOOLEAN_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.CHAR_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.DOUBLE_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.FLOAT_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.INT_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_ARRAY;
import static org.apache.ignite.internal.util.ArrayUtils.SHORT_ARRAY;
import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF;
import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF;
public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/** Poison object. */
private static final Object NULL = new Object();
/** */
private final MessageSerializationRegistry serializationRegistry;
/** */
private ByteBuffer buf;
/** */
private byte[] heapArr;
/** */
private long baseOff;
/** */
private int arrOff = -1;
/** */
private Object tmpArr;
/** */
private int tmpArrOff;
/** Number of bytes of the boundary value, read from previous message. */
private int valReadBytes;
/** */
private int tmpArrBytes;
/** */
private boolean msgTypeDone;
/** */
private MessageDeserializer<NetworkMessage> msgDeserializer;
/** */
private Iterator<?> mapIt;
/** */
private Iterator<?> it;
/** */
private int arrPos = -1;
/** */
private Object arrCur = NULL;
/** */
private Object mapCur = NULL;
/** */
private Object cur = NULL;
/** */
private boolean keyDone;
/** */
private int readSize = -1;
/** */
private int readItems;
/** */
private Object[] objArr;
/** */
private Collection<Object> col;
/** */
private Map<Object, Object> map;
/** */
private long prim;
/** */
private int primShift;
/** */
private int uuidState;
/** */
private long uuidMost;
/** */
private long uuidLeast;
/** */
private long uuidLocId;
/** */
protected boolean lastFinished;
/** byte-array representation of string */
private byte[] curStrBackingArr;
/**
* @param serializationRegistry Message mappers.
*/
public DirectByteBufferStreamImplV1(MessageSerializationRegistry serializationRegistry) {
this.serializationRegistry = serializationRegistry;
}
/** {@inheritDoc} */
@Override public void setBuffer(ByteBuffer buf) {
assert buf != null;
if (this.buf != buf) {
this.buf = buf;
heapArr = buf.isDirect() ? null : buf.array();
baseOff = buf.isDirect() ? GridUnsafe.bufferAddress(buf) : BYTE_ARR_OFF;
}
}
/** {@inheritDoc} */
@Override public int remaining() {
return buf.remaining();
}
/** {@inheritDoc} */
@Override public boolean lastFinished() {
return lastFinished;
}
/** {@inheritDoc} */
@Override public void writeByte(byte val) {
lastFinished = buf.remaining() >= 1;
if (lastFinished) {
int pos = buf.position();
GridUnsafe.putByte(heapArr, baseOff + pos, val);
buf.position(pos + 1);
}
}
/** {@inheritDoc} */
@Override public void writeShort(short val) {
lastFinished = buf.remaining() >= 2;
if (lastFinished) {
int pos = buf.position();
long off = baseOff + pos;
if (BIG_ENDIAN)
GridUnsafe.putShortLE(heapArr, off, val);
else
GridUnsafe.putShort(heapArr, off, val);
buf.position(pos + 2);
}
}
/** {@inheritDoc} */
@Override public void writeInt(int val) {
lastFinished = buf.remaining() >= 5;
if (lastFinished) {
if (val == Integer.MAX_VALUE)
val = Integer.MIN_VALUE;
else
val++;
int pos = buf.position();
while ((val & 0xFFFF_FF80) != 0) {
byte b = (byte)(val | 0x80);
GridUnsafe.putByte(heapArr, baseOff + pos++, b);
val >>>= 7;
}
GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
buf.position(pos);
}
}
/** {@inheritDoc} */
@Override public void writeLong(long val) {
lastFinished = buf.remaining() >= 10;
if (lastFinished) {
if (val == Long.MAX_VALUE)
val = Long.MIN_VALUE;
else
val++;
int pos = buf.position();
while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
byte b = (byte)(val | 0x80);
GridUnsafe.putByte(heapArr, baseOff + pos++, b);
val >>>= 7;
}
GridUnsafe.putByte(heapArr, baseOff + pos++, (byte)val);
buf.position(pos);
}
}
/** {@inheritDoc} */
@Override public void writeFloat(float val) {
lastFinished = buf.remaining() >= 4;
if (lastFinished) {
int pos = buf.position();
long off = baseOff + pos;
if (BIG_ENDIAN)
GridUnsafe.putFloatLE(heapArr, off, val);
else
GridUnsafe.putFloat(heapArr, off, val);
buf.position(pos + 4);
}
}
/** {@inheritDoc} */
@Override public void writeDouble(double val) {
lastFinished = buf.remaining() >= 8;
if (lastFinished) {
int pos = buf.position();
long off = baseOff + pos;
if (BIG_ENDIAN)
GridUnsafe.putDoubleLE(heapArr, off, val);
else
GridUnsafe.putDouble(heapArr, off, val);
buf.position(pos + 8);
}
}
/** {@inheritDoc} */
@Override public void writeChar(char val) {
lastFinished = buf.remaining() >= 2;
if (lastFinished) {
int pos = buf.position();
long off = baseOff + pos;
if (BIG_ENDIAN)
GridUnsafe.putCharLE(heapArr, off, val);
else
GridUnsafe.putChar(heapArr, off, val);
buf.position(pos + 2);
}
}
/** {@inheritDoc} */
@Override public void writeBoolean(boolean val) {
lastFinished = buf.remaining() >= 1;
if (lastFinished) {
int pos = buf.position();
GridUnsafe.putBoolean(heapArr, baseOff + pos, val);
buf.position(pos + 1);
}
}
/** {@inheritDoc} */
@Override public void writeByteArray(byte[] val) {
if (val != null)
lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeByteArray(byte[] val, long off, int len) {
if (val != null)
lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeShortArray(short[] val) {
if (val != null)
if (BIG_ENDIAN)
lastFinished = writeArrayLE(val, SHORT_ARR_OFF, val.length, 2, 1);
else
lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeIntArray(int[] val) {
if (val != null)
if (BIG_ENDIAN)
lastFinished = writeArrayLE(val, INT_ARR_OFF, val.length, 4, 2);
else
lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeLongArray(long[] val) {
if (val != null)
if (BIG_ENDIAN)
lastFinished = writeArrayLE(val, LONG_ARR_OFF, val.length, 8, 3);
else
lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeLongArray(long[] val, int len) {
if (val != null)
if (BIG_ENDIAN)
lastFinished = writeArrayLE(val, LONG_ARR_OFF, len, 8, 3);
else
lastFinished = writeArray(val, LONG_ARR_OFF, len, len << 3);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeFloatArray(float[] val) {
if (val != null)
if (BIG_ENDIAN)
lastFinished = writeArrayLE(val, FLOAT_ARR_OFF, val.length, 4, 2);
else
lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeDoubleArray(double[] val) {
if (val != null)
if (BIG_ENDIAN)
lastFinished = writeArrayLE(val, DOUBLE_ARR_OFF, val.length, 8, 3);
else
lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeCharArray(char[] val) {
if (val != null) {
if (BIG_ENDIAN)
lastFinished = writeArrayLE(val, CHAR_ARR_OFF, val.length, 2, 1);
else
lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1);
}
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeBooleanArray(boolean[] val) {
if (val != null)
lastFinished = writeArray(val, GridUnsafe.BOOLEAN_ARR_OFF, val.length, val.length);
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public void writeString(String val) {
if (val != null) {
if (curStrBackingArr == null)
curStrBackingArr = val.getBytes();
writeByteArray(curStrBackingArr);
if (lastFinished)
curStrBackingArr = null;
}
else
writeByteArray(null);
}
/** {@inheritDoc} */
@Override public void writeBitSet(BitSet val) {
writeLongArray(val != null ? val.toLongArray() : null);
}
/** {@inheritDoc} */
@Override public void writeUuid(UUID val) {
switch (uuidState) {
case 0:
writeBoolean(val == null);
if (!lastFinished || val == null)
return;
uuidState++;
//noinspection fallthrough
case 1:
writeLong(val.getMostSignificantBits());
if (!lastFinished)
return;
uuidState++;
//noinspection fallthrough
case 2:
writeLong(val.getLeastSignificantBits());
if (!lastFinished)
return;
uuidState = 0;
}
}
/** {@inheritDoc} */
@Override public void writeIgniteUuid(IgniteUuid val) {
switch (uuidState) {
case 0:
writeBoolean(val == null);
if (!lastFinished || val == null)
return;
uuidState++;
//noinspection fallthrough
case 1:
writeLong(val.globalId().getMostSignificantBits());
if (!lastFinished)
return;
uuidState++;
//noinspection fallthrough
case 2:
writeLong(val.globalId().getLeastSignificantBits());
if (!lastFinished)
return;
uuidState++;
//noinspection fallthrough
case 3:
writeLong(val.localId());
if (!lastFinished)
return;
uuidState = 0;
}
}
/** {@inheritDoc} */
@Override public void writeMessage(NetworkMessage msg, MessageWriter writer) {
if (msg != null) {
if (buf.hasRemaining()) {
try {
writer.beforeInnerMessageWrite();
writer.setCurrentWriteClass(msg.getClass());
MessageSerializer<NetworkMessage> serializer = serializationRegistry.createSerializer(msg.directType());
writer.setBuffer(buf);
lastFinished = serializer.writeMessage(msg, writer);
}
finally {
writer.afterInnerMessageWrite(lastFinished);
}
}
else
lastFinished = false;
}
else
writeShort(Short.MIN_VALUE);
}
/** {@inheritDoc} */
@Override public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType,
MessageWriter writer) {
if (arr != null) {
int len = arr.length;
if (arrPos == -1) {
writeInt(len);
if (!lastFinished)
return;
arrPos = 0;
}
while (arrPos < len || arrCur != NULL) {
if (arrCur == NULL)
arrCur = arr[arrPos++];
write(itemType, arrCur, writer);
if (!lastFinished)
return;
arrCur = NULL;
}
arrPos = -1;
}
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType,
MessageWriter writer) {
if (col != null) {
if (col instanceof List && col instanceof RandomAccess)
writeRandomAccessList((List<T>)col, itemType, writer);
else {
if (it == null) {
writeInt(col.size());
if (!lastFinished)
return;
it = col.iterator();
}
while (it.hasNext() || cur != NULL) {
if (cur == NULL)
cur = it.next();
write(itemType, cur, writer);
if (!lastFinished)
return;
cur = NULL;
}
it = null;
}
}
else
writeInt(-1);
}
/**
* @param list List.
* @param itemType Component type.
* @param writer Writer.
*/
private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) {
assert list instanceof RandomAccess;
int size = list.size();
if (arrPos == -1) {
writeInt(size);
if (!lastFinished)
return;
arrPos = 0;
}
while (arrPos < size || arrCur != NULL) {
if (arrCur == NULL)
arrCur = list.get(arrPos++);
write(itemType, arrCur, writer);
if (!lastFinished)
return;
arrCur = NULL;
}
arrPos = -1;
}
/** {@inheritDoc} */
@Override public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType,
MessageCollectionItemType valType, MessageWriter writer) {
if (map != null) {
if (mapIt == null) {
writeInt(map.size());
if (!lastFinished)
return;
mapIt = map.entrySet().iterator();
}
while (mapIt.hasNext() || mapCur != NULL) {
if (mapCur == NULL)
mapCur = mapIt.next();
Map.Entry<K, V> e = (Map.Entry<K, V>) mapCur;
if (!keyDone) {
write(keyType, e.getKey(), writer);
if (!lastFinished)
return;
keyDone = true;
}
write(valType, e.getValue(), writer);
if (!lastFinished)
return;
mapCur = NULL;
keyDone = false;
}
mapIt = null;
}
else
writeInt(-1);
}
/** {@inheritDoc} */
@Override public byte readByte() {
lastFinished = buf.remaining() >= 1;
if (lastFinished) {
int pos = buf.position();
buf.position(pos + 1);
return GridUnsafe.getByte(heapArr, baseOff + pos);
}
else
return 0;
}
/** {@inheritDoc} */
@Override public short readShort() {
lastFinished = buf.remaining() >= 2;
if (lastFinished) {
int pos = buf.position();
buf.position(pos + 2);
long off = baseOff + pos;
return BIG_ENDIAN ? GridUnsafe.getShortLE(heapArr, off) : GridUnsafe.getShort(heapArr, off);
}
else
return 0;
}
/** {@inheritDoc} */
@Override public int readInt() {
lastFinished = false;
int val = 0;
while (buf.hasRemaining()) {
int pos = buf.position();
byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
buf.position(pos + 1);
prim |= ((long)b & 0x7F) << (7 * primShift);
if ((b & 0x80) == 0) {
lastFinished = true;
val = (int)prim;
if (val == Integer.MIN_VALUE)
val = Integer.MAX_VALUE;
else
val--;
prim = 0;
primShift = 0;
break;
}
else
primShift++;
}
return val;
}
/** {@inheritDoc} */
@Override public long readLong() {
lastFinished = false;
long val = 0;
while (buf.hasRemaining()) {
int pos = buf.position();
byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
buf.position(pos + 1);
prim |= ((long)b & 0x7F) << (7 * primShift);
if ((b & 0x80) == 0) {
lastFinished = true;
val = prim;
if (val == Long.MIN_VALUE)
val = Long.MAX_VALUE;
else
val--;
prim = 0;
primShift = 0;
break;
}
else
primShift++;
}
return val;
}
/** {@inheritDoc} */
@Override public float readFloat() {
lastFinished = buf.remaining() >= 4;
if (lastFinished) {
int pos = buf.position();
buf.position(pos + 4);
long off = baseOff + pos;
return BIG_ENDIAN ? GridUnsafe.getFloatLE(heapArr, off) : GridUnsafe.getFloat(heapArr, off);
}
else
return 0;
}
/** {@inheritDoc} */
@Override public double readDouble() {
lastFinished = buf.remaining() >= 8;
if (lastFinished) {
int pos = buf.position();
buf.position(pos + 8);
long off = baseOff + pos;
return BIG_ENDIAN ? GridUnsafe.getDoubleLE(heapArr, off) : GridUnsafe.getDouble(heapArr, off);
}
else
return 0;
}
/** {@inheritDoc} */
@Override public char readChar() {
lastFinished = buf.remaining() >= 2;
if (lastFinished) {
int pos = buf.position();
buf.position(pos + 2);
long off = baseOff + pos;
return BIG_ENDIAN ? GridUnsafe.getCharLE(heapArr, off) : GridUnsafe.getChar(heapArr, off);
}
else
return 0;
}
/** {@inheritDoc} */
@Override public boolean readBoolean() {
lastFinished = buf.hasRemaining();
if (lastFinished) {
int pos = buf.position();
buf.position(pos + 1);
return GridUnsafe.getBoolean(heapArr, baseOff + pos);
}
else
return false;
}
/** {@inheritDoc} */
@Override public byte[] readByteArray() {
return readArray(BYTE_ARRAY, 0, BYTE_ARR_OFF);
}
/** {@inheritDoc} */
@Override public short[] readShortArray() {
if (BIG_ENDIAN)
return readArrayLE(SHORT_ARRAY, 2, 1, SHORT_ARR_OFF);
else
return readArray(SHORT_ARRAY, 1, SHORT_ARR_OFF);
}
/** {@inheritDoc} */
@Override public int[] readIntArray() {
if (BIG_ENDIAN)
return readArrayLE(INT_ARRAY, 4, 2, INT_ARR_OFF);
else
return readArray(INT_ARRAY, 2, INT_ARR_OFF);
}
/** {@inheritDoc} */
@Override public long[] readLongArray() {
if (BIG_ENDIAN)
return readArrayLE(LONG_ARRAY, 8, 3, LONG_ARR_OFF);
else
return readArray(LONG_ARRAY, 3, LONG_ARR_OFF);
}
/** {@inheritDoc} */
@Override public float[] readFloatArray() {
if (BIG_ENDIAN)
return readArrayLE(FLOAT_ARRAY, 4, 2, FLOAT_ARR_OFF);
else
return readArray(FLOAT_ARRAY, 2, FLOAT_ARR_OFF);
}
/** {@inheritDoc} */
@Override public double[] readDoubleArray() {
if (BIG_ENDIAN)
return readArrayLE(DOUBLE_ARRAY, 8, 3, DOUBLE_ARR_OFF);
else
return readArray(DOUBLE_ARRAY, 3, DOUBLE_ARR_OFF);
}
/** {@inheritDoc} */
@Override public char[] readCharArray() {
if (BIG_ENDIAN)
return readArrayLE(CHAR_ARRAY, 2, 1, CHAR_ARR_OFF);
else
return readArray(CHAR_ARRAY, 1, CHAR_ARR_OFF);
}
/** {@inheritDoc} */
@Override public boolean[] readBooleanArray() {
return readArray(BOOLEAN_ARRAY, 0, GridUnsafe.BOOLEAN_ARR_OFF);
}
/** {@inheritDoc} */
@Override public String readString() {
byte[] arr = readByteArray();
return arr != null ? new String(arr) : null;
}
/** {@inheritDoc} */
@Override public BitSet readBitSet() {
long[] arr = readLongArray();
return arr != null ? BitSet.valueOf(arr) : null;
}
/** {@inheritDoc} */
@Override public UUID readUuid() {
switch (uuidState) {
case 0:
boolean isNull = readBoolean();
if (!lastFinished || isNull)
return null;
uuidState++;
//noinspection fallthrough
case 1:
uuidMost = readLong();
if (!lastFinished)
return null;
uuidState++;
//noinspection fallthrough
case 2:
uuidLeast = readLong();
if (!lastFinished)
return null;
uuidState = 0;
}
UUID val = new UUID(uuidMost, uuidLeast);
uuidMost = 0;
uuidLeast = 0;
return val;
}
/** {@inheritDoc} */
@Override public IgniteUuid readIgniteUuid() {
switch (uuidState) {
case 0:
boolean isNull = readBoolean();
if (!lastFinished || isNull)
return null;
uuidState++;
//noinspection fallthrough
case 1:
uuidMost = readLong();
if (!lastFinished)
return null;
uuidState++;
//noinspection fallthrough
case 2:
uuidLeast = readLong();
if (!lastFinished)
return null;
uuidState++;
//noinspection fallthrough
case 3:
uuidLocId = readLong();
if (!lastFinished)
return null;
uuidState = 0;
}
IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId);
uuidMost = 0;
uuidLeast = 0;
uuidLocId = 0;
return val;
}
/** {@inheritDoc} */
@Override public <T extends NetworkMessage> T readMessage(MessageReader reader) {
if (!msgTypeDone) {
if (buf.remaining() < NetworkMessage.DIRECT_TYPE_SIZE) {
lastFinished = false;
return null;
}
short type = readShort();
msgDeserializer = type == Short.MIN_VALUE ? null : serializationRegistry.createDeserializer(type);
msgTypeDone = true;
}
if (msgDeserializer != null) {
try {
reader.beforeInnerMessageRead();
reader.setCurrentReadClass(msgDeserializer.klass());
reader.setBuffer(buf);
lastFinished = msgDeserializer.readMessage(reader);
}
finally {
reader.afterInnerMessageRead(lastFinished);
}
}
else
lastFinished = true;
if (lastFinished) {
NetworkMessage msg0 = msgDeserializer != null ? msgDeserializer.getMessage() : null;
msgTypeDone = false;
msgDeserializer = null;
return (T)msg0;
}
else
return null;
}
/** {@inheritDoc} */
@Override public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls,
MessageReader reader) {
if (readSize == -1) {
int size = readInt();
if (!lastFinished)
return null;
readSize = size;
}
if (readSize >= 0) {
if (objArr == null)
objArr = itemCls != null ? (Object[]) Array.newInstance(itemCls, readSize) : new Object[readSize];
for (int i = readItems; i < readSize; i++) {
Object item = read(itemType, reader);
if (!lastFinished)
return null;
objArr[i] = item;
readItems++;
}
}
readSize = -1;
readItems = 0;
cur = null;
T[] objArr0 = (T[])objArr;
objArr = null;
return objArr0;
}
/** {@inheritDoc} */
@Override public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType,
MessageReader reader) {
if (readSize == -1) {
int size = readInt();
if (!lastFinished)
return null;
readSize = size;
}
if (readSize >= 0) {
if (col == null)
col = new ArrayList<>(readSize);
for (int i = readItems; i < readSize; i++) {
Object item = read(itemType, reader);
if (!lastFinished)
return null;
col.add(item);
readItems++;
}
}
readSize = -1;
readItems = 0;
cur = null;
C col0 = (C)col;
col = null;
return col0;
}
/** {@inheritDoc} */
@Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
MessageCollectionItemType valType, boolean linked, MessageReader reader) {
if (readSize == -1) {
int size = readInt();
if (!lastFinished)
return null;
readSize = size;
}
if (readSize >= 0) {
if (map == null)
map = linked ? IgniteUtils.newLinkedHashMap(readSize) : IgniteUtils.newHashMap(readSize);
for (int i = readItems; i < readSize; i++) {
if (!keyDone) {
Object key = read(keyType, reader);
if (!lastFinished)
return null;
mapCur = key;
keyDone = true;
}
Object val = read(valType, reader);
if (!lastFinished)
return null;
map.put(mapCur, val);
keyDone = false;
readItems++;
}
}
readSize = -1;
readItems = 0;
mapCur = null;
M map0 = (M)map;
map = null;
return map0;
}
/**
* @param arr Array.
* @param off Offset.
* @param len Length.
* @param bytes Length in bytes.
* @return Whether array was fully written.
*/
boolean writeArray(Object arr, long off, int len, int bytes) {
assert arr != null;
assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
assert off > 0;
assert len >= 0;
assert bytes >= 0;
assert bytes >= arrOff;
if (writeArrayLength(len))
return false;
int toWrite = bytes - arrOff;
int pos = buf.position();
int remaining = buf.remaining();
if (toWrite <= remaining) {
if (toWrite > 0) {
GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
buf.position(pos + toWrite);
}
arrOff = -1;
return true;
}
else {
if (remaining > 0) {
GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
buf.position(pos + remaining);
arrOff += remaining;
}
return false;
}
}
/**
* @param arr Array.
* @param off Offset.
* @param len Length.
* @param typeSize Primitive type size in bytes. Needs for byte reverse.
* @param shiftCnt Shift for length.
* @return Whether array was fully written.
*/
boolean writeArrayLE(Object arr, long off, int len, int typeSize, int shiftCnt) {
assert arr != null;
assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
assert off > 0;
assert len >= 0;
int bytes = len << shiftCnt;
assert bytes >= arrOff;
if (writeArrayLength(len))
return false;
int toWrite = (bytes - arrOff) >> shiftCnt;
int remaining = buf.remaining() >> shiftCnt;
if (toWrite <= remaining) {
writeArrayLE(arr, off, toWrite, typeSize);
arrOff = -1;
return true;
}
else {
if (remaining > 0)
writeArrayLE(arr, off, remaining, typeSize);
return false;
}
}
/**
* @param arr Array.
* @param off Offset.
* @param len Length.
* @param typeSize Primitive type size in bytes.
*/
private void writeArrayLE(Object arr, long off, int len, int typeSize) {
int pos = buf.position();
for (int i = 0; i < len; i++) {
for (int j = 0; j < typeSize; j++) {
byte b = GridUnsafe.getByteField(arr, off + arrOff + (typeSize - j - 1));
GridUnsafe.putByte(heapArr, baseOff + pos++, b);
}
buf.position(pos);
arrOff += typeSize;
}
}
/**
* @param len Length.
*/
private boolean writeArrayLength(int len) {
if (arrOff == -1) {
writeInt(len);
if (!lastFinished)
return true;
arrOff = 0;
}
return false;
}
/**
* @param <T> Type of an array.
* @param creator Array creator.
* @param lenShift Array length shift size.
* @param off Base offset.
* @return Array or special value if it was not fully read.
*/
<T> T readArray(ArrayFactory<T> creator, int lenShift, long off) {
assert creator != null;
if (tmpArr == null) {
int len = readInt();
if (!lastFinished)
return null;
switch (len) {
case -1:
lastFinished = true;
return null;
case 0:
lastFinished = true;
return creator.of(0);
default:
tmpArr = creator.of(len);
tmpArrBytes = len << lenShift;
}
}
int toRead = tmpArrBytes - tmpArrOff;
int remaining = buf.remaining();
int pos = buf.position();
lastFinished = toRead <= remaining;
if (lastFinished) {
GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
buf.position(pos + toRead);
T arr = (T)tmpArr;
tmpArr = null;
tmpArrBytes = 0;
tmpArrOff = 0;
return arr;
}
else {
GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
buf.position(pos + remaining);
tmpArrOff += remaining;
return null;
}
}
/**
* @param <T> Type of an array.
* @param creator Array creator.
* @param typeSize Primitive type size in bytes.
* @param lenShift Array length shift size.
* @param off Base offset.
* @return Array or special value if it was not fully read.
*/
<T> T readArrayLE(ArrayFactory<T> creator, int typeSize, int lenShift, long off) {
assert creator != null;
if (tmpArr == null) {
int len = readInt();
if (!lastFinished)
return null;
switch (len) {
case -1:
lastFinished = true;
return null;
case 0:
lastFinished = true;
return creator.of(0);
default:
tmpArr = creator.of(len);
tmpArrBytes = len << lenShift;
}
}
int toRead = tmpArrBytes - tmpArrOff - valReadBytes;
int remaining = buf.remaining();
lastFinished = toRead <= remaining;
if (!lastFinished)
toRead = remaining;
int pos = buf.position();
for (int i = 0; i < toRead; i++) {
byte b = GridUnsafe.getByte(heapArr, baseOff + pos + i);
GridUnsafe.putByteField(tmpArr, off + tmpArrOff + (typeSize - valReadBytes - 1), b);
if (++valReadBytes == typeSize) {
valReadBytes = 0;
tmpArrOff += typeSize;
}
}
buf.position(pos + toRead);
if (lastFinished) {
T arr = (T)tmpArr;
tmpArr = null;
tmpArrBytes = 0;
tmpArrOff = 0;
return arr;
}
else
return null;
}
protected void write(MessageCollectionItemType type, Object val, MessageWriter writer) {
switch (type) {
case BYTE:
writeByte((Byte)val);
break;
case SHORT:
writeShort((Short)val);
break;
case INT:
writeInt((Integer)val);
break;
case LONG:
writeLong((Long)val);
break;
case FLOAT:
writeFloat((Float)val);
break;
case DOUBLE:
writeDouble((Double)val);
break;
case CHAR:
writeChar((Character)val);
break;
case BOOLEAN:
writeBoolean((Boolean)val);
break;
case BYTE_ARR:
writeByteArray((byte[])val);
break;
case SHORT_ARR:
writeShortArray((short[])val);
break;
case INT_ARR:
writeIntArray((int[])val);
break;
case LONG_ARR:
writeLongArray((long[])val);
break;
case FLOAT_ARR:
writeFloatArray((float[])val);
break;
case DOUBLE_ARR:
writeDoubleArray((double[])val);
break;
case CHAR_ARR:
writeCharArray((char[])val);
break;
case BOOLEAN_ARR:
writeBooleanArray((boolean[])val);
break;
case STRING:
writeString((String)val);
break;
case BIT_SET:
writeBitSet((BitSet)val);
break;
case UUID:
writeUuid((UUID)val);
break;
case IGNITE_UUID:
writeIgniteUuid((IgniteUuid)val);
break;
case MSG:
try {
if (val != null)
writer.beforeInnerMessageWrite();
writeMessage((NetworkMessage) val, writer);
}
finally {
if (val != null)
writer.afterInnerMessageWrite(lastFinished);
}
break;
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
}
/**
* @param type Type.
* @param reader Reader.
* @return Value.
*/
protected Object read(MessageCollectionItemType type, MessageReader reader) {
switch (type) {
case BYTE:
return readByte();
case SHORT:
return readShort();
case INT:
return readInt();
case LONG:
return readLong();
case FLOAT:
return readFloat();
case DOUBLE:
return readDouble();
case CHAR:
return readChar();
case BOOLEAN:
return readBoolean();
case BYTE_ARR:
return readByteArray();
case SHORT_ARR:
return readShortArray();
case INT_ARR:
return readIntArray();
case LONG_ARR:
return readLongArray();
case FLOAT_ARR:
return readFloatArray();
case DOUBLE_ARR:
return readDoubleArray();
case CHAR_ARR:
return readCharArray();
case BOOLEAN_ARR:
return readBooleanArray();
case STRING:
return readString();
case BIT_SET:
return readBitSet();
case UUID:
return readUuid();
case IGNITE_UUID:
return readIgniteUuid();
case MSG:
return readMessage(reader);
default:
throw new IllegalArgumentException("Unknown type: " + type);
}
}
}