blob: e11b5a32a84da86954f8df39aa6272ba5129025f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.network.direct;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.network.direct.state.DirectMessageState;
import org.apache.ignite.internal.network.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageReader;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.jetbrains.annotations.Nullable;
/**
* Message reader implementation.
*/
public class DirectMessageReader implements MessageReader {
/** State. */
private final DirectMessageState<StateItem> state;
/** Whether last field was fully read. */
private boolean lastRead;
/**
* Constructor.
*
* @param serializationRegistry Serialization registry.
* @param protoVer Protocol version.
*/
public DirectMessageReader(
MessageSerializationRegistry serializationRegistry,
byte protoVer
) {
state = new DirectMessageState<>(StateItem.class, () -> new StateItem(createStream(serializationRegistry, protoVer)));
}
/** {@inheritDoc} */
@Override
public void setBuffer(ByteBuffer buf) {
state.item().stream.setBuffer(buf);
}
/** {@inheritDoc} */
@Override
public void setCurrentReadClass(Class<? extends NetworkMessage> msgCls) {
// No-op.
}
/** {@inheritDoc} */
@Override
public boolean beforeMessageRead() {
return true;
}
/** {@inheritDoc} */
@Override
public boolean afterMessageRead(Class<? extends NetworkMessage> msgCls) {
return true;
}
/** {@inheritDoc} */
@Override
public byte readByte(String name) {
DirectByteBufferStream stream = state.item().stream;
byte val = stream.readByte();
lastRead = stream.lastFinished();
return val;
}
@Override
public @Nullable Byte readBoxedByte(String name) {
DirectByteBufferStream stream = state.item().stream;
Byte val = stream.readBoxedByte();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public short readShort(String name) {
DirectByteBufferStream stream = state.item().stream;
short val = stream.readShort();
lastRead = stream.lastFinished();
return val;
}
@Override
public @Nullable Short readBoxedShort(String name) {
DirectByteBufferStream stream = state.item().stream;
Short val = stream.readBoxedShort();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public int readInt(String name) {
DirectByteBufferStream stream = state.item().stream;
int val = stream.readInt();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public int readInt(String name, int dflt) {
return readInt(name);
}
@Override
public @Nullable Integer readBoxedInt(String name) {
DirectByteBufferStream stream = state.item().stream;
Integer val = stream.readBoxedInt();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public long readLong(String name) {
DirectByteBufferStream stream = state.item().stream;
long val = stream.readLong();
lastRead = stream.lastFinished();
return val;
}
@Override
public @Nullable Long readBoxedLong(String name) {
DirectByteBufferStream stream = state.item().stream;
Long val = stream.readBoxedLong();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public float readFloat(String name) {
DirectByteBufferStream stream = state.item().stream;
float val = stream.readFloat();
lastRead = stream.lastFinished();
return val;
}
@Override
public @Nullable Float readBoxedFloat(String name) {
DirectByteBufferStream stream = state.item().stream;
Float val = stream.readBoxedFloat();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public double readDouble(String name) {
DirectByteBufferStream stream = state.item().stream;
double val = stream.readDouble();
lastRead = stream.lastFinished();
return val;
}
@Override
public @Nullable Double readBoxedDouble(String name) {
DirectByteBufferStream stream = state.item().stream;
Double val = stream.readBoxedDouble();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public char readChar(String name) {
DirectByteBufferStream stream = state.item().stream;
char val = stream.readChar();
lastRead = stream.lastFinished();
return val;
}
@Override
public @Nullable Character readBoxedChar(String name) {
DirectByteBufferStream stream = state.item().stream;
Character val = stream.readBoxedChar();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public boolean readBoolean(String name) {
DirectByteBufferStream stream = state.item().stream;
boolean val = stream.readBoolean();
lastRead = stream.lastFinished();
return val;
}
@Override
public @Nullable Boolean readBoxedBoolean(String name) {
DirectByteBufferStream stream = state.item().stream;
Boolean val = stream.readBoxedBoolean();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Nullable
@Override
public byte[] readByteArray(String name) {
DirectByteBufferStream stream = state.item().stream;
byte[] arr = stream.readByteArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable
@Override
public short[] readShortArray(String name) {
DirectByteBufferStream stream = state.item().stream;
short[] arr = stream.readShortArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable
@Override
public int[] readIntArray(String name) {
DirectByteBufferStream stream = state.item().stream;
int[] arr = stream.readIntArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable
@Override
public long[] readLongArray(String name) {
DirectByteBufferStream stream = state.item().stream;
long[] arr = stream.readLongArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable
@Override
public float[] readFloatArray(String name) {
DirectByteBufferStream stream = state.item().stream;
float[] arr = stream.readFloatArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable
@Override
public double[] readDoubleArray(String name) {
DirectByteBufferStream stream = state.item().stream;
double[] arr = stream.readDoubleArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable
@Override
public char[] readCharArray(String name) {
DirectByteBufferStream stream = state.item().stream;
char[] arr = stream.readCharArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable
@Override
public boolean[] readBooleanArray(String name) {
DirectByteBufferStream stream = state.item().stream;
boolean[] arr = stream.readBooleanArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Override
public String readString(String name) {
DirectByteBufferStream stream = state.item().stream;
String val = stream.readString();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public BitSet readBitSet(String name) {
DirectByteBufferStream stream = state.item().stream;
BitSet val = stream.readBitSet();
lastRead = stream.lastFinished();
return val;
}
@Override
public ByteBuffer readByteBuffer(String name) {
DirectByteBufferStream stream = state.item().stream;
ByteBuffer val = stream.readByteBuffer();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public UUID readUuid(String name) {
DirectByteBufferStream stream = state.item().stream;
UUID val = stream.readUuid();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override
public IgniteUuid readIgniteUuid(String name) {
DirectByteBufferStream stream = state.item().stream;
IgniteUuid val = stream.readIgniteUuid();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Nullable
@Override
public <T extends NetworkMessage> T readMessage(String name) {
DirectByteBufferStream stream = state.item().stream;
T msg = stream.readMessage(this);
lastRead = stream.lastFinished();
return msg;
}
/** {@inheritDoc} */
@Override
public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls) {
DirectByteBufferStream stream = state.item().stream;
T[] msg = stream.readObjectArray(itemType, itemCls, this);
lastRead = stream.lastFinished();
return msg;
}
/** {@inheritDoc} */
@Override
public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
C col = stream.readCollection(itemType, this);
lastRead = stream.lastFinished();
return col;
}
/** {@inheritDoc} */
@Override
public <C extends List<?>> C readList(String name, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
Collection<?> col = stream.readCollection(itemType, this);
lastRead = stream.lastFinished();
assert col == null || col instanceof List : col;
return (C) col;
}
/** {@inheritDoc} */
@Override
public <C extends Set<?>> C readSet(String name, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
C col = stream.readSet(itemType, this);
lastRead = stream.lastFinished();
return col;
}
/** {@inheritDoc} */
@Override
public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType,
MessageCollectionItemType valType, boolean linked) {
DirectByteBufferStream stream = state.item().stream;
M map = stream.readMap(keyType, valType, linked, this);
lastRead = stream.lastFinished();
return map;
}
/** {@inheritDoc} */
@Override
public boolean isLastRead() {
return lastRead;
}
/** {@inheritDoc} */
@Override
public int state() {
return state.item().state;
}
/** {@inheritDoc} */
@Override
public void incrementState() {
state.item().state++;
}
/** {@inheritDoc} */
@Override
public void beforeInnerMessageRead() {
state.forward();
}
/** {@inheritDoc} */
@Override
public void afterInnerMessageRead(boolean finished) {
state.backward(finished);
}
/** {@inheritDoc} */
@Override
public void reset() {
state.reset();
}
/**
* Returns a stream to read message fields recursively.
*
* @param serializationRegistry Serialization registry.
* @param protoVer Protocol version.
*/
protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
switch (protoVer) {
case 1:
return new DirectByteBufferStreamImplV1(serializationRegistry);
default:
throw new IllegalStateException("Invalid protocol version: " + protoVer);
}
}
/**
* State item.
*/
private static class StateItem implements DirectMessageStateItem {
/** Stream. */
private final DirectByteBufferStream stream;
/** State. */
private int state;
/**
* Constructor.
*
* @param stream Direct byte buffer stream.
*/
StateItem(DirectByteBufferStream stream) {
this.stream = stream;
}
/** {@inheritDoc} */
@Override
public void reset() {
state = 0;
}
}
}