| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.network.direct.stream; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.ignite.internal.util.ArrayUtils.BOOLEAN_ARRAY; |
| import static org.apache.ignite.internal.util.ArrayUtils.BYTE_ARRAY; |
| import static org.apache.ignite.internal.util.ArrayUtils.CHAR_ARRAY; |
| import static org.apache.ignite.internal.util.ArrayUtils.DOUBLE_ARRAY; |
| import static org.apache.ignite.internal.util.ArrayUtils.FLOAT_ARRAY; |
| import static org.apache.ignite.internal.util.ArrayUtils.INT_ARRAY; |
| import static org.apache.ignite.internal.util.ArrayUtils.LONG_ARRAY; |
| import static org.apache.ignite.internal.util.ArrayUtils.SHORT_ARRAY; |
| import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF; |
| import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF; |
| import static org.apache.ignite.internal.util.GridUnsafe.DOUBLE_ARR_OFF; |
| import static org.apache.ignite.internal.util.GridUnsafe.FLOAT_ARR_OFF; |
| import static org.apache.ignite.internal.util.GridUnsafe.INT_ARR_OFF; |
| import static org.apache.ignite.internal.util.GridUnsafe.IS_BIG_ENDIAN; |
| import static org.apache.ignite.internal.util.GridUnsafe.LONG_ARR_OFF; |
| import static org.apache.ignite.internal.util.GridUnsafe.SHORT_ARR_OFF; |
| |
| import java.lang.reflect.Array; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.RandomAccess; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.function.IntFunction; |
| import java.util.function.Supplier; |
| import org.apache.ignite.internal.util.ArrayFactory; |
| import org.apache.ignite.internal.util.GridUnsafe; |
| import org.apache.ignite.internal.util.IgniteUtils; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.network.NetworkMessage; |
| import org.apache.ignite.network.serialization.MessageDeserializer; |
| import org.apache.ignite.network.serialization.MessageReader; |
| import org.apache.ignite.network.serialization.MessageSerializationRegistry; |
| import org.apache.ignite.network.serialization.MessageSerializer; |
| import org.apache.ignite.network.serialization.MessageWriter; |
| import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * {@link DirectByteBufferStream} implementation. |
| */ |
| public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream { |
| /** Poison object. */ |
| private static final Object NULL = new Object(); |
| |
| /** Flag that indicates that byte buffer is not null. */ |
| protected static final byte BYTE_BUFFER_NOT_NULL_FLAG = 1; |
| |
| /** Flag that indicates that byte buffer has Big Endinan order. */ |
| protected static final byte BYTE_BUFFER_BIG_ENDIAN_FLAG = 2; |
| |
| /** Message serialization registry. */ |
| private final MessageSerializationRegistry serializationRegistry; |
| |
| protected ByteBuffer buf; |
| |
| protected byte[] heapArr; |
| |
| protected long baseOff; |
| |
| private int arrOff = -1; |
| |
| private Object tmpArr; |
| |
| private int tmpArrOff; |
| |
| /** Number of bytes of the boundary value, read from previous message. */ |
| private int valReadBytes; |
| |
| private int tmpArrBytes; |
| |
| /** |
| * When {@code true}, this flag indicates that {@link #msgGroupType} contains a valid part of the currently read message header. {@code |
| * false} means that {@link #msgGroupType} might contain some leftover data from previous reads and can be discarded. |
| */ |
| private boolean msgGroupTypeRead; |
| |
| /** |
| * Group type of the message that is currently being received. |
| * |
| * <p>This field saves the partial message header, because it is not received in one piece, but rather in two: message group type and |
| * message type. |
| */ |
| private short msgGroupType; |
| |
| /** |
| * Flag needed for reading boxed primitives. |
| * |
| * <p>Boxed primitives are encoded as a boolean flag ({@code false} meaning that the boxed value is {@code null}), followed by the |
| * unboxed value (if not null). Therefore, this flag value must be cached between two unsuccessful read calls in case the received boxed |
| * primitive was not {@code null}. |
| */ |
| private boolean boxedTypeNotNull; |
| |
| @Nullable |
| private MessageDeserializer<NetworkMessage> msgDeserializer; |
| |
| @Nullable |
| private MessageSerializer<NetworkMessage> msgSerializer; |
| |
| private Iterator<?> mapIt; |
| |
| private Iterator<?> it; |
| |
| private int arrPos = -1; |
| |
| private Object arrCur = NULL; |
| |
| private Object mapCur = NULL; |
| |
| private Object cur = NULL; |
| |
| private boolean keyDone; |
| |
| private int readSize = -1; |
| |
| private int readItems; |
| |
| private Object[] objArr; |
| |
| private Collection<Object> col; |
| |
| private Map<Object, Object> map; |
| |
| private long prim; |
| |
| private int primShift; |
| |
| private int uuidState; |
| |
| private long uuidMost; |
| |
| private long uuidLeast; |
| |
| private long uuidLocId; |
| |
| private int byteBufferState; |
| |
| private byte byteBufferFlag; |
| |
| protected boolean lastFinished; |
| |
| /** byte-array representation of string. */ |
| private byte[] curStrBackingArr; |
| |
| /** |
| * Constructor. |
| * |
| * @param serializationRegistry Serialization service. . |
| */ |
| public DirectByteBufferStreamImplV1(MessageSerializationRegistry serializationRegistry) { |
| this.serializationRegistry = serializationRegistry; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void setBuffer(ByteBuffer buf) { |
| assert buf != null; |
| |
| if (this.buf != buf) { |
| this.buf = buf; |
| |
| heapArr = buf.isDirect() ? null : buf.array(); |
| baseOff = buf.isDirect() ? GridUnsafe.bufferAddress(buf) : BYTE_ARR_OFF; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public int remaining() { |
| return buf.remaining(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public boolean lastFinished() { |
| return lastFinished; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeByte(byte val) { |
| lastFinished = buf.remaining() >= 1; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| GridUnsafe.putByte(heapArr, baseOff + pos, val); |
| |
| buf.position(pos + 1); |
| } |
| } |
| |
| @Override |
| public void writeBoxedByte(@Nullable Byte val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 1; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeByte(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeShort(short val) { |
| lastFinished = buf.remaining() >= 2; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| long off = baseOff + pos; |
| |
| if (IS_BIG_ENDIAN) { |
| GridUnsafe.putShortLittleEndian(heapArr, off, val); |
| } else { |
| GridUnsafe.putShort(heapArr, off, val); |
| } |
| |
| buf.position(pos + 2); |
| } |
| } |
| |
| @Override |
| public void writeBoxedShort(@Nullable Short val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 2; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeShort(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeInt(int val) { |
| lastFinished = buf.remaining() >= 5; |
| |
| if (lastFinished) { |
| val++; |
| |
| int pos = buf.position(); |
| |
| while ((val & 0xFFFF_FF80) != 0) { |
| byte b = (byte) (val | 0x80); |
| |
| GridUnsafe.putByte(heapArr, baseOff + pos++, b); |
| |
| val >>>= 7; |
| } |
| |
| GridUnsafe.putByte(heapArr, baseOff + pos++, (byte) val); |
| |
| buf.position(pos); |
| } |
| } |
| |
| @Override |
| public void writeBoxedInt(@Nullable Integer val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 5; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeInt(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeLong(long val) { |
| lastFinished = buf.remaining() >= 10; |
| |
| if (lastFinished) { |
| val++; |
| |
| int pos = buf.position(); |
| |
| while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) { |
| byte b = (byte) (val | 0x80); |
| |
| GridUnsafe.putByte(heapArr, baseOff + pos++, b); |
| |
| val >>>= 7; |
| } |
| |
| GridUnsafe.putByte(heapArr, baseOff + pos++, (byte) val); |
| |
| buf.position(pos); |
| } |
| } |
| |
| @Override |
| public void writeBoxedLong(@Nullable Long val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 10; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeLong(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeFloat(float val) { |
| lastFinished = buf.remaining() >= 4; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| long off = baseOff + pos; |
| |
| if (IS_BIG_ENDIAN) { |
| GridUnsafe.putFloatLittleEndian(heapArr, off, val); |
| } else { |
| GridUnsafe.putFloat(heapArr, off, val); |
| } |
| |
| buf.position(pos + 4); |
| } |
| } |
| |
| @Override |
| public void writeBoxedFloat(@Nullable Float val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 4; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeFloat(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeDouble(double val) { |
| lastFinished = buf.remaining() >= 8; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| long off = baseOff + pos; |
| |
| if (IS_BIG_ENDIAN) { |
| GridUnsafe.putDoubleLittleEndian(heapArr, off, val); |
| } else { |
| GridUnsafe.putDouble(heapArr, off, val); |
| } |
| |
| buf.position(pos + 8); |
| } |
| } |
| |
| @Override |
| public void writeBoxedDouble(@Nullable Double val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 8; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeDouble(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeChar(char val) { |
| lastFinished = buf.remaining() >= 2; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| long off = baseOff + pos; |
| |
| if (IS_BIG_ENDIAN) { |
| GridUnsafe.putCharLittleEndian(heapArr, off, val); |
| } else { |
| GridUnsafe.putChar(heapArr, off, val); |
| } |
| |
| buf.position(pos + 2); |
| } |
| } |
| |
| @Override |
| public void writeBoxedChar(@Nullable Character val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 2; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeChar(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeBoolean(boolean val) { |
| lastFinished = buf.remaining() >= 1; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| GridUnsafe.putBoolean(heapArr, baseOff + pos, val); |
| |
| buf.position(pos + 1); |
| } |
| } |
| |
| @Override |
| public void writeBoxedBoolean(@Nullable Boolean val) { |
| if (val != null) { |
| lastFinished = buf.remaining() >= 1 + 1; |
| |
| if (lastFinished) { |
| writeBoolean(true); |
| |
| writeBoolean(val); |
| } |
| } else { |
| writeBoolean(false); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeByteArray(byte[] val) { |
| if (val != null) { |
| lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length); |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeByteArray(byte[] val, long off, int len) { |
| if (val != null) { |
| lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len); |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeShortArray(short[] val) { |
| if (val != null) { |
| if (IS_BIG_ENDIAN) { |
| lastFinished = writeArrayLittleEndian(val, SHORT_ARR_OFF, val.length, 2, 1); |
| } else { |
| lastFinished = writeArray(val, SHORT_ARR_OFF, val.length, val.length << 1); |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeIntArray(int[] val) { |
| if (val != null) { |
| if (IS_BIG_ENDIAN) { |
| lastFinished = writeArrayLittleEndian(val, INT_ARR_OFF, val.length, 4, 2); |
| } else { |
| lastFinished = writeArray(val, INT_ARR_OFF, val.length, val.length << 2); |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeLongArray(long[] val) { |
| if (val != null) { |
| if (IS_BIG_ENDIAN) { |
| lastFinished = writeArrayLittleEndian(val, LONG_ARR_OFF, val.length, 8, 3); |
| } else { |
| lastFinished = writeArray(val, LONG_ARR_OFF, val.length, val.length << 3); |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeLongArray(long[] val, int len) { |
| if (val != null) { |
| if (IS_BIG_ENDIAN) { |
| lastFinished = writeArrayLittleEndian(val, LONG_ARR_OFF, len, 8, 3); |
| } else { |
| lastFinished = writeArray(val, LONG_ARR_OFF, len, len << 3); |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeFloatArray(float[] val) { |
| if (val != null) { |
| if (IS_BIG_ENDIAN) { |
| lastFinished = writeArrayLittleEndian(val, FLOAT_ARR_OFF, val.length, 4, 2); |
| } else { |
| lastFinished = writeArray(val, FLOAT_ARR_OFF, val.length, val.length << 2); |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeDoubleArray(double[] val) { |
| if (val != null) { |
| if (IS_BIG_ENDIAN) { |
| lastFinished = writeArrayLittleEndian(val, DOUBLE_ARR_OFF, val.length, 8, 3); |
| } else { |
| lastFinished = writeArray(val, DOUBLE_ARR_OFF, val.length, val.length << 3); |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeCharArray(char[] val) { |
| if (val != null) { |
| if (IS_BIG_ENDIAN) { |
| lastFinished = writeArrayLittleEndian(val, CHAR_ARR_OFF, val.length, 2, 1); |
| } else { |
| lastFinished = writeArray(val, CHAR_ARR_OFF, val.length, val.length << 1); |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeBooleanArray(boolean[] val) { |
| if (val != null) { |
| lastFinished = writeArray(val, GridUnsafe.BOOLEAN_ARR_OFF, val.length, val.length); |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeString(String val) { |
| if (val != null) { |
| if (curStrBackingArr == null) { |
| curStrBackingArr = val.getBytes(UTF_8); |
| } |
| |
| writeByteArray(curStrBackingArr); |
| |
| if (lastFinished) { |
| curStrBackingArr = null; |
| } |
| } else { |
| writeByteArray(null); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeBitSet(BitSet val) { |
| writeLongArray(val != null ? val.toLongArray() : null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeByteBuffer(ByteBuffer val) { |
| switch (byteBufferState) { |
| case 0: |
| byte flag = 0; |
| |
| if (val != null) { |
| flag |= BYTE_BUFFER_NOT_NULL_FLAG; |
| |
| if (val.order() == ByteOrder.BIG_ENDIAN) { |
| flag |= BYTE_BUFFER_BIG_ENDIAN_FLAG; |
| } |
| } |
| |
| writeByte(flag); |
| |
| if (!lastFinished || val == null) { |
| return; |
| } |
| |
| byteBufferState++; |
| |
| //noinspection fallthrough |
| case 1: |
| assert !val.isReadOnly(); |
| |
| int position = val.position(); |
| int length = val.limit() - position; |
| |
| if (val.isDirect()) { |
| lastFinished = writeArray(null, GridUnsafe.bufferAddress(val) + position, length, length); |
| } else { |
| lastFinished = writeArray(val.array(), BYTE_ARR_OFF + val.arrayOffset() + position, length, length); |
| } |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| byteBufferState = 0; |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown byteBufferState: " + byteBufferState); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeUuid(UUID val) { |
| switch (uuidState) { |
| case 0: |
| writeBoolean(val == null); |
| |
| if (!lastFinished || val == null) { |
| return; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 1: |
| writeLong(val.getMostSignificantBits()); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 2: |
| writeLong(val.getLeastSignificantBits()); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| uuidState = 0; |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown uuidState: " + uuidState); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeIgniteUuid(IgniteUuid val) { |
| switch (uuidState) { |
| case 0: |
| writeBoolean(val == null); |
| |
| if (!lastFinished || val == null) { |
| return; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 1: |
| writeLong(val.globalId().getMostSignificantBits()); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 2: |
| writeLong(val.globalId().getLeastSignificantBits()); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 3: |
| writeLong(val.localId()); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| uuidState = 0; |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown uuidState: " + uuidState); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void writeMessage(NetworkMessage msg, MessageWriter writer) { |
| if (msg != null) { |
| if (buf.hasRemaining()) { |
| try { |
| writer.beforeInnerMessageWrite(); |
| |
| writer.setCurrentWriteClass(msg.getClass()); |
| |
| if (msgSerializer == null) { |
| msgSerializer = serializationRegistry.createSerializer(msg.groupType(), msg.messageType()); |
| } |
| |
| writer.setBuffer(buf); |
| |
| lastFinished = msgSerializer.writeMessage(msg, writer); |
| |
| if (lastFinished) { |
| msgSerializer = null; |
| } |
| } finally { |
| writer.afterInnerMessageWrite(lastFinished); |
| } |
| } else { |
| lastFinished = false; |
| } |
| } else { |
| writeShort(Short.MIN_VALUE); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, |
| MessageWriter writer) { |
| if (arr != null) { |
| int len = arr.length; |
| |
| if (arrPos == -1) { |
| writeInt(len); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| arrPos = 0; |
| } |
| |
| while (arrPos < len || arrCur != NULL) { |
| if (arrCur == NULL) { |
| arrCur = arr[arrPos++]; |
| } |
| |
| write(itemType, arrCur, writer); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| arrCur = NULL; |
| } |
| |
| arrPos = -1; |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, |
| MessageWriter writer) { |
| if (col != null) { |
| if (col instanceof List && col instanceof RandomAccess) { |
| writeRandomAccessList((List<T>) col, itemType, writer); |
| } else { |
| if (it == null) { |
| writeInt(col.size()); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| it = col.iterator(); |
| } |
| |
| while (it.hasNext() || cur != NULL) { |
| if (cur == NULL) { |
| cur = it.next(); |
| } |
| |
| write(itemType, cur, writer); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| cur = NULL; |
| } |
| |
| it = null; |
| } |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| @Override |
| public <T> void writeSet(Set<T> set, MessageCollectionItemType itemType, MessageWriter writer) { |
| writeCollection(set, itemType, writer); |
| } |
| |
| /** |
| * Writes {@link List}. |
| * |
| * @param list List. |
| * @param itemType Component type. |
| * @param writer Writer. |
| */ |
| private <T> void writeRandomAccessList(List<T> list, MessageCollectionItemType itemType, MessageWriter writer) { |
| assert list instanceof RandomAccess; |
| |
| int size = list.size(); |
| |
| if (arrPos == -1) { |
| writeInt(size); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| arrPos = 0; |
| } |
| |
| while (arrPos < size || arrCur != NULL) { |
| if (arrCur == NULL) { |
| arrCur = list.get(arrPos++); |
| } |
| |
| write(itemType, arrCur, writer); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| arrCur = NULL; |
| } |
| |
| arrPos = -1; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public <K, V> void writeMap(Map<K, V> map, MessageCollectionItemType keyType, |
| MessageCollectionItemType valType, MessageWriter writer) { |
| if (map != null) { |
| if (mapIt == null) { |
| writeInt(map.size()); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| mapIt = map.entrySet().iterator(); |
| } |
| |
| while (mapIt.hasNext() || mapCur != NULL) { |
| if (mapCur == NULL) { |
| mapCur = mapIt.next(); |
| } |
| |
| Map.Entry<K, V> e = (Map.Entry<K, V>) mapCur; |
| |
| if (!keyDone) { |
| write(keyType, e.getKey(), writer); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| keyDone = true; |
| } |
| |
| write(valType, e.getValue(), writer); |
| |
| if (!lastFinished) { |
| return; |
| } |
| |
| mapCur = NULL; |
| keyDone = false; |
| } |
| |
| mapIt = null; |
| } else { |
| writeInt(-1); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public byte readByte() { |
| lastFinished = buf.remaining() >= 1; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| buf.position(pos + 1); |
| |
| return GridUnsafe.getByte(heapArr, baseOff + pos); |
| } else { |
| return 0; |
| } |
| } |
| |
| @Override |
| public @Nullable Byte readBoxedByte() { |
| return readBoxedValue(this::readByte); |
| } |
| |
| private <T> @Nullable T readBoxedValue(Supplier<T> valueReader) { |
| // First, check if we have read the null flag in a previous call. |
| if (boxedTypeNotNull || readBoolean()) { |
| boxedTypeNotNull = true; |
| |
| T result = valueReader.get(); |
| |
| // If the whole value has been read successfully, reset the state. |
| if (lastFinished) { |
| boxedTypeNotNull = false; |
| } |
| |
| return result; |
| } else { |
| return null; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public short readShort() { |
| lastFinished = buf.remaining() >= 2; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| buf.position(pos + 2); |
| |
| long off = baseOff + pos; |
| |
| return IS_BIG_ENDIAN ? GridUnsafe.getShortLittleEndian(heapArr, off) : GridUnsafe.getShort(heapArr, off); |
| } else { |
| return 0; |
| } |
| } |
| |
| @Override |
| public @Nullable Short readBoxedShort() { |
| return readBoxedValue(this::readShort); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public int readInt() { |
| lastFinished = false; |
| |
| int val = 0; |
| |
| int pos = buf.position(); |
| |
| int limit = buf.limit(); |
| |
| while (pos < limit) { |
| byte b = GridUnsafe.getByte(heapArr, baseOff + pos); |
| |
| pos++; |
| |
| prim |= ((long) b & 0x7F) << (7 * primShift); |
| |
| if ((b & 0x80) == 0) { |
| lastFinished = true; |
| |
| val = (int) prim - 1; |
| |
| prim = 0; |
| primShift = 0; |
| |
| break; |
| } else { |
| primShift++; |
| } |
| } |
| |
| buf.position(pos); |
| |
| return val; |
| } |
| |
| @Override |
| public @Nullable Integer readBoxedInt() { |
| return readBoxedValue(this::readInt); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public long readLong() { |
| lastFinished = false; |
| |
| long val = 0; |
| |
| int pos = buf.position(); |
| |
| int limit = buf.limit(); |
| |
| while (pos < limit) { |
| byte b = GridUnsafe.getByte(heapArr, baseOff + pos); |
| |
| pos++; |
| |
| prim |= ((long) b & 0x7F) << (7 * primShift); |
| |
| if ((b & 0x80) == 0) { |
| lastFinished = true; |
| |
| val = prim - 1; |
| |
| prim = 0; |
| primShift = 0; |
| |
| break; |
| } else { |
| primShift++; |
| } |
| } |
| |
| buf.position(pos); |
| |
| return val; |
| } |
| |
| @Override |
| public @Nullable Long readBoxedLong() { |
| return readBoxedValue(this::readLong); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public float readFloat() { |
| lastFinished = buf.remaining() >= 4; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| buf.position(pos + 4); |
| |
| long off = baseOff + pos; |
| |
| return IS_BIG_ENDIAN ? GridUnsafe.getFloatLittleEndian(heapArr, off) : GridUnsafe.getFloat(heapArr, off); |
| } else { |
| return 0; |
| } |
| } |
| |
| @Override |
| public @Nullable Float readBoxedFloat() { |
| return readBoxedValue(this::readFloat); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public double readDouble() { |
| lastFinished = buf.remaining() >= 8; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| buf.position(pos + 8); |
| |
| long off = baseOff + pos; |
| |
| return IS_BIG_ENDIAN ? GridUnsafe.getDoubleLittleEndian(heapArr, off) : GridUnsafe.getDouble(heapArr, off); |
| } else { |
| return 0; |
| } |
| } |
| |
| @Override |
| public @Nullable Double readBoxedDouble() { |
| return readBoxedValue(this::readDouble); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public char readChar() { |
| lastFinished = buf.remaining() >= 2; |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| buf.position(pos + 2); |
| |
| long off = baseOff + pos; |
| |
| return IS_BIG_ENDIAN ? GridUnsafe.getCharLittleEndian(heapArr, off) : GridUnsafe.getChar(heapArr, off); |
| } else { |
| return 0; |
| } |
| } |
| |
| @Override |
| public @Nullable Character readBoxedChar() { |
| return readBoxedValue(this::readChar); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public boolean readBoolean() { |
| lastFinished = buf.hasRemaining(); |
| |
| if (lastFinished) { |
| int pos = buf.position(); |
| |
| buf.position(pos + 1); |
| |
| return GridUnsafe.getBoolean(heapArr, baseOff + pos); |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public @Nullable Boolean readBoxedBoolean() { |
| return readBoxedValue(this::readBoolean); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public byte[] readByteArray() { |
| return readArray(BYTE_ARRAY, 0, BYTE_ARR_OFF); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public short[] readShortArray() { |
| if (IS_BIG_ENDIAN) { |
| return readArrayLittleEndian(SHORT_ARRAY, 2, 1, SHORT_ARR_OFF); |
| } else { |
| return readArray(SHORT_ARRAY, 1, SHORT_ARR_OFF); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public int[] readIntArray() { |
| if (IS_BIG_ENDIAN) { |
| return readArrayLittleEndian(INT_ARRAY, 4, 2, INT_ARR_OFF); |
| } else { |
| return readArray(INT_ARRAY, 2, INT_ARR_OFF); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public long[] readLongArray() { |
| if (IS_BIG_ENDIAN) { |
| return readArrayLittleEndian(LONG_ARRAY, 8, 3, LONG_ARR_OFF); |
| } else { |
| return readArray(LONG_ARRAY, 3, LONG_ARR_OFF); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public float[] readFloatArray() { |
| if (IS_BIG_ENDIAN) { |
| return readArrayLittleEndian(FLOAT_ARRAY, 4, 2, FLOAT_ARR_OFF); |
| } else { |
| return readArray(FLOAT_ARRAY, 2, FLOAT_ARR_OFF); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public double[] readDoubleArray() { |
| if (IS_BIG_ENDIAN) { |
| return readArrayLittleEndian(DOUBLE_ARRAY, 8, 3, DOUBLE_ARR_OFF); |
| } else { |
| return readArray(DOUBLE_ARRAY, 3, DOUBLE_ARR_OFF); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public char[] readCharArray() { |
| if (IS_BIG_ENDIAN) { |
| return readArrayLittleEndian(CHAR_ARRAY, 2, 1, CHAR_ARR_OFF); |
| } else { |
| return readArray(CHAR_ARRAY, 1, CHAR_ARR_OFF); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public boolean[] readBooleanArray() { |
| return readArray(BOOLEAN_ARRAY, 0, GridUnsafe.BOOLEAN_ARR_OFF); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public String readString() { |
| byte[] arr = readByteArray(); |
| |
| return arr != null ? new String(arr, UTF_8) : null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public BitSet readBitSet() { |
| long[] arr = readLongArray(); |
| |
| return arr != null ? BitSet.valueOf(arr) : null; |
| } |
| |
| @Override |
| public ByteBuffer readByteBuffer() { |
| byte[] bytes; |
| |
| switch (byteBufferState) { |
| case 0: |
| byteBufferFlag = readByte(); |
| |
| boolean isNull = (byteBufferFlag & BYTE_BUFFER_NOT_NULL_FLAG) == 0; |
| |
| if (!lastFinished || isNull) { |
| return null; |
| } |
| |
| byteBufferState++; |
| |
| //noinspection fallthrough |
| case 1: |
| bytes = readByteArray(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| byteBufferState = 0; |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown byteBufferState: " + byteBufferState); |
| } |
| |
| ByteBuffer val = ByteBuffer.wrap(bytes); |
| |
| if ((byteBufferFlag & BYTE_BUFFER_BIG_ENDIAN_FLAG) == 0) { |
| val.order(ByteOrder.LITTLE_ENDIAN); |
| } else { |
| val.order(ByteOrder.BIG_ENDIAN); |
| } |
| |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public UUID readUuid() { |
| switch (uuidState) { |
| case 0: |
| boolean isNull = readBoolean(); |
| |
| if (!lastFinished || isNull) { |
| return null; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 1: |
| uuidMost = readLong(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 2: |
| uuidLeast = readLong(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| uuidState = 0; |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown uuidState: " + uuidState); |
| } |
| |
| UUID val = new UUID(uuidMost, uuidLeast); |
| |
| uuidMost = 0; |
| uuidLeast = 0; |
| |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public IgniteUuid readIgniteUuid() { |
| switch (uuidState) { |
| case 0: |
| boolean isNull = readBoolean(); |
| |
| if (!lastFinished || isNull) { |
| return null; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 1: |
| uuidMost = readLong(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 2: |
| uuidLeast = readLong(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| uuidState++; |
| |
| //noinspection fallthrough |
| case 3: |
| uuidLocId = readLong(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| uuidState = 0; |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown uuidState: " + uuidState); |
| } |
| |
| final IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), uuidLocId); |
| |
| uuidMost = 0; |
| uuidLeast = 0; |
| uuidLocId = 0; |
| |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| @Nullable |
| public <T extends NetworkMessage> T readMessage(MessageReader reader) { |
| // if the deserialzer is null then we haven't finished reading the message header |
| if (msgDeserializer == null) { |
| // read the message group type |
| if (!msgGroupTypeRead) { |
| msgGroupType = readShort(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| // message group type will be equal to Short.MIN_VALUE if a nested message is null |
| if (msgGroupType == Short.MIN_VALUE) { // lastFinished is "true" here, so no further parsing will be required |
| return null; |
| } |
| |
| // save current progress, because we can read the header in two chunks |
| msgGroupTypeRead = true; |
| } |
| |
| // read the message type |
| short msgType = readShort(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| msgDeserializer = serializationRegistry.createDeserializer(msgGroupType, msgType); |
| } |
| |
| // if the deserializer is not null then we have definitely finished parsing the header and can read the message |
| // body |
| reader.beforeInnerMessageRead(); |
| |
| try { |
| reader.setCurrentReadClass(msgDeserializer.klass()); |
| |
| reader.setBuffer(buf); |
| |
| lastFinished = msgDeserializer.readMessage(reader); |
| } finally { |
| reader.afterInnerMessageRead(lastFinished); |
| } |
| |
| if (lastFinished) { |
| T result = (T) msgDeserializer.getMessage(); |
| |
| msgGroupTypeRead = false; |
| msgDeserializer = null; |
| |
| return result; |
| } else { |
| return null; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, |
| MessageReader reader) { |
| if (readSize == -1) { |
| int size = readInt(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| readSize = size; |
| } |
| |
| if (readSize >= 0) { |
| if (objArr == null) { |
| objArr = itemCls != null ? (Object[]) Array.newInstance(itemCls, readSize) : new Object[readSize]; |
| } |
| |
| for (int i = readItems; i < readSize; i++) { |
| Object item = read(itemType, reader); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| objArr[i] = item; |
| |
| readItems++; |
| } |
| } |
| |
| readSize = -1; |
| readItems = 0; |
| cur = null; |
| |
| T[] objArr0 = (T[]) objArr; |
| |
| objArr = null; |
| |
| return objArr0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, |
| MessageReader reader) { |
| return (C) readCollection0(itemType, reader, ArrayList::new); |
| } |
| |
| @Override |
| public <C extends Set<?>> C readSet(MessageCollectionItemType itemType, MessageReader reader) { |
| return (C) readCollection0(itemType, reader, HashSet::new); |
| } |
| |
| /** |
| * Common implementation for {@link #readCollection(MessageCollectionItemType, MessageReader)} and |
| * {@link #readSet(MessageCollectionItemType, MessageReader)}. Reads a sequence of objects and puts it into a collection, created by |
| * {@code ctor}. |
| * |
| * @param itemType Collection item type. |
| * @param reader Message reader instance. |
| * @param ctor Factory for creating a collection using its length. |
| * |
| * @return Collection, read from the reader, or {@code null} if reading has not completed yet. |
| */ |
| private @Nullable Collection<?> readCollection0( |
| MessageCollectionItemType itemType, |
| MessageReader reader, |
| IntFunction<Collection<Object>> ctor |
| ) { |
| if (readSize == -1) { |
| int size = readInt(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| readSize = size; |
| } |
| |
| if (readSize >= 0) { |
| if (col == null) { |
| col = ctor.apply(readSize); |
| } |
| |
| for (int i = readItems; i < readSize; i++) { |
| Object item = read(itemType, reader); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| col.add(item); |
| |
| readItems++; |
| } |
| } |
| |
| readSize = -1; |
| readItems = 0; |
| cur = null; |
| |
| Collection<?> col0 = col; |
| |
| col = null; |
| |
| return col0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, |
| MessageCollectionItemType valType, boolean linked, MessageReader reader) { |
| if (readSize == -1) { |
| int size = readInt(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| readSize = size; |
| } |
| |
| if (readSize >= 0) { |
| if (map == null) { |
| map = linked ? IgniteUtils.newLinkedHashMap(readSize) : IgniteUtils.newHashMap(readSize); |
| } |
| |
| for (int i = readItems; i < readSize; i++) { |
| if (!keyDone) { |
| Object key = read(keyType, reader); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| mapCur = key; |
| keyDone = true; |
| } |
| |
| Object val = read(valType, reader); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| map.put(mapCur, val); |
| |
| keyDone = false; |
| |
| readItems++; |
| } |
| } |
| |
| readSize = -1; |
| readItems = 0; |
| mapCur = null; |
| |
| M map0 = (M) map; |
| |
| map = null; |
| |
| return map0; |
| } |
| |
| /** |
| * Writes array. |
| * |
| * @param arr Array. |
| * @param off Offset. |
| * @param len Length. |
| * @param bytes Length in bytes. |
| * @return Whether array was fully written. |
| */ |
| boolean writeArray(@Nullable Object arr, long off, int len, int bytes) { |
| assert arr == null || arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive(); |
| assert off > 0; |
| assert len >= 0; |
| assert bytes >= 0; |
| assert bytes >= arrOff; |
| |
| if (writeArrayLength(len)) { |
| return false; |
| } |
| |
| int toWrite = bytes - arrOff; |
| int pos = buf.position(); |
| int remaining = buf.remaining(); |
| |
| if (toWrite <= remaining) { |
| if (toWrite > 0) { |
| GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite); |
| |
| buf.position(pos + toWrite); |
| } |
| |
| arrOff = -1; |
| |
| return true; |
| } else { |
| if (remaining > 0) { |
| GridUnsafe.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining); |
| |
| buf.position(pos + remaining); |
| |
| arrOff += remaining; |
| } |
| |
| return false; |
| } |
| } |
| |
| /** |
| * Writes array. |
| * |
| * @param arr Array. |
| * @param off Offset. |
| * @param len Length. |
| * @param typeSize Primitive type size in bytes. Needs for byte reverse. |
| * @param shiftCnt Shift for length. |
| * @return Whether array was fully written. |
| */ |
| boolean writeArrayLittleEndian(Object arr, long off, int len, int typeSize, int shiftCnt) { |
| assert arr != null; |
| assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive(); |
| assert off > 0; |
| assert len >= 0; |
| |
| int bytes = len << shiftCnt; |
| |
| assert bytes >= arrOff; |
| |
| if (writeArrayLength(len)) { |
| return false; |
| } |
| |
| int toWrite = (bytes - arrOff) >> shiftCnt; |
| int remaining = buf.remaining() >> shiftCnt; |
| |
| if (toWrite <= remaining) { |
| writeArrayLittleEndian(arr, off, toWrite, typeSize); |
| |
| arrOff = -1; |
| |
| return true; |
| } else { |
| if (remaining > 0) { |
| writeArrayLittleEndian(arr, off, remaining, typeSize); |
| } |
| |
| return false; |
| } |
| } |
| |
| /** |
| * Writes array. |
| * |
| * @param arr Array. |
| * @param off Offset. |
| * @param len Length. |
| * @param typeSize Primitive type size in bytes. |
| */ |
| private void writeArrayLittleEndian(Object arr, long off, int len, int typeSize) { |
| int pos = buf.position(); |
| |
| for (int i = 0; i < len; i++) { |
| for (int j = 0; j < typeSize; j++) { |
| byte b = GridUnsafe.getByteField(arr, off + arrOff + (typeSize - j - 1)); |
| |
| GridUnsafe.putByte(heapArr, baseOff + pos++, b); |
| } |
| |
| buf.position(pos); |
| arrOff += typeSize; |
| } |
| } |
| |
| /** |
| * Writes array length. |
| * |
| * @param len Length. |
| */ |
| private boolean writeArrayLength(int len) { |
| if (arrOff == -1) { |
| writeInt(len); |
| |
| if (!lastFinished) { |
| return true; |
| } |
| |
| arrOff = 0; |
| } |
| return false; |
| } |
| |
| /** |
| * Reads array. |
| * |
| * @param <T> Type of array. |
| * @param creator Array creator. |
| * @param lenShift Array length shift size. |
| * @param off Base offset. |
| * @return Array or special value if it was not fully read. |
| */ |
| <T> T readArray(ArrayFactory<T> creator, int lenShift, long off) { |
| assert creator != null; |
| |
| if (tmpArr == null) { |
| int len = readInt(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| switch (len) { |
| case -1: |
| lastFinished = true; |
| |
| return null; |
| |
| case 0: |
| lastFinished = true; |
| |
| return creator.of(0); |
| |
| default: |
| tmpArr = creator.of(len); |
| tmpArrBytes = len << lenShift; |
| } |
| } |
| |
| int toRead = tmpArrBytes - tmpArrOff; |
| int remaining = buf.remaining(); |
| int pos = buf.position(); |
| |
| lastFinished = toRead <= remaining; |
| |
| if (lastFinished) { |
| GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead); |
| |
| buf.position(pos + toRead); |
| |
| final T arr = (T) tmpArr; |
| |
| tmpArr = null; |
| tmpArrBytes = 0; |
| tmpArrOff = 0; |
| |
| return arr; |
| } else { |
| GridUnsafe.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining); |
| |
| buf.position(pos + remaining); |
| |
| tmpArrOff += remaining; |
| |
| return null; |
| } |
| } |
| |
| /** |
| * Reads array. |
| * |
| * @param <T> Type of array. |
| * @param creator Array creator. |
| * @param typeSize Primitive type size in bytes. |
| * @param lenShift Array length shift size. |
| * @param off Base offset. |
| * @return Array or special value if it was not fully read. |
| */ |
| <T> T readArrayLittleEndian(ArrayFactory<T> creator, int typeSize, int lenShift, long off) { |
| assert creator != null; |
| |
| if (tmpArr == null) { |
| int len = readInt(); |
| |
| if (!lastFinished) { |
| return null; |
| } |
| |
| switch (len) { |
| case -1: |
| lastFinished = true; |
| |
| return null; |
| |
| case 0: |
| lastFinished = true; |
| |
| return creator.of(0); |
| |
| default: |
| tmpArr = creator.of(len); |
| tmpArrBytes = len << lenShift; |
| } |
| } |
| |
| int toRead = tmpArrBytes - tmpArrOff - valReadBytes; |
| int remaining = buf.remaining(); |
| |
| lastFinished = toRead <= remaining; |
| |
| if (!lastFinished) { |
| toRead = remaining; |
| } |
| |
| int pos = buf.position(); |
| |
| for (int i = 0; i < toRead; i++) { |
| byte b = GridUnsafe.getByte(heapArr, baseOff + pos + i); |
| |
| GridUnsafe.putByteField(tmpArr, off + tmpArrOff + (typeSize - valReadBytes - 1), b); |
| |
| if (++valReadBytes == typeSize) { |
| valReadBytes = 0; |
| tmpArrOff += typeSize; |
| } |
| } |
| |
| buf.position(pos + toRead); |
| |
| if (lastFinished) { |
| final T arr = (T) tmpArr; |
| |
| tmpArr = null; |
| tmpArrBytes = 0; |
| tmpArrOff = 0; |
| |
| return arr; |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Writes value. |
| * |
| * @param type Type. |
| * @param val Value. |
| * @param writer Writer. |
| */ |
| protected void write(MessageCollectionItemType type, Object val, MessageWriter writer) { |
| switch (type) { |
| case BYTE: |
| writeByte((Byte) val); |
| |
| break; |
| |
| case SHORT: |
| writeShort((Short) val); |
| |
| break; |
| |
| case INT: |
| writeInt((Integer) val); |
| |
| break; |
| |
| case LONG: |
| writeLong((Long) val); |
| |
| break; |
| |
| case FLOAT: |
| writeFloat((Float) val); |
| |
| break; |
| |
| case DOUBLE: |
| writeDouble((Double) val); |
| |
| break; |
| |
| case CHAR: |
| writeChar((Character) val); |
| |
| break; |
| |
| case BOOLEAN: |
| writeBoolean((Boolean) val); |
| |
| break; |
| |
| case BYTE_ARR: |
| writeByteArray((byte[]) val); |
| |
| break; |
| |
| case SHORT_ARR: |
| writeShortArray((short[]) val); |
| |
| break; |
| |
| case INT_ARR: |
| writeIntArray((int[]) val); |
| |
| break; |
| |
| case LONG_ARR: |
| writeLongArray((long[]) val); |
| |
| break; |
| |
| case FLOAT_ARR: |
| writeFloatArray((float[]) val); |
| |
| break; |
| |
| case DOUBLE_ARR: |
| writeDoubleArray((double[]) val); |
| |
| break; |
| |
| case CHAR_ARR: |
| writeCharArray((char[]) val); |
| |
| break; |
| |
| case BOOLEAN_ARR: |
| writeBooleanArray((boolean[]) val); |
| |
| break; |
| |
| case STRING: |
| writeString((String) val); |
| |
| break; |
| |
| case BIT_SET: |
| writeBitSet((BitSet) val); |
| |
| break; |
| |
| case BYTE_BUFFER: |
| writeByteBuffer((ByteBuffer) val); |
| |
| break; |
| |
| case UUID: |
| writeUuid((UUID) val); |
| |
| break; |
| |
| case IGNITE_UUID: |
| writeIgniteUuid((IgniteUuid) val); |
| |
| break; |
| |
| case MSG: |
| try { |
| if (val != null) { |
| writer.beforeInnerMessageWrite(); |
| } |
| |
| writeMessage((NetworkMessage) val, writer); |
| } finally { |
| if (val != null) { |
| writer.afterInnerMessageWrite(lastFinished); |
| } |
| } |
| |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown type: " + type); |
| } |
| } |
| |
| /** |
| * Reads value. |
| * |
| * @param type Type. |
| * @param reader Reader. |
| * @return Value. |
| */ |
| protected Object read(MessageCollectionItemType type, MessageReader reader) { |
| switch (type) { |
| case BYTE: |
| return readByte(); |
| |
| case SHORT: |
| return readShort(); |
| |
| case INT: |
| return readInt(); |
| |
| case LONG: |
| return readLong(); |
| |
| case FLOAT: |
| return readFloat(); |
| |
| case DOUBLE: |
| return readDouble(); |
| |
| case CHAR: |
| return readChar(); |
| |
| case BOOLEAN: |
| return readBoolean(); |
| |
| case BYTE_ARR: |
| return readByteArray(); |
| |
| case SHORT_ARR: |
| return readShortArray(); |
| |
| case INT_ARR: |
| return readIntArray(); |
| |
| case LONG_ARR: |
| return readLongArray(); |
| |
| case FLOAT_ARR: |
| return readFloatArray(); |
| |
| case DOUBLE_ARR: |
| return readDoubleArray(); |
| |
| case CHAR_ARR: |
| return readCharArray(); |
| |
| case BOOLEAN_ARR: |
| return readBooleanArray(); |
| |
| case STRING: |
| return readString(); |
| |
| case BIT_SET: |
| return readBitSet(); |
| |
| case BYTE_BUFFER: |
| return readByteBuffer(); |
| |
| case UUID: |
| return readUuid(); |
| |
| case IGNITE_UUID: |
| return readIgniteUuid(); |
| |
| case MSG: |
| return readMessage(reader); |
| |
| default: |
| throw new IllegalArgumentException("Unknown type: " + type); |
| } |
| } |
| } |