blob: 47d78778168247951415dac5f482342a4aa156c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.direct;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.direct.stream.v1.DirectByteBufferStreamImplV1;
import org.apache.ignite.internal.direct.stream.v2.DirectByteBufferStreamImplV2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.jetbrains.annotations.Nullable;
/**
* Message reader implementation.
*/
public class DirectMessageReader implements MessageReader {
/** State. */
private final DirectMessageState<StateItem> state;
/** Whether last field was fully read. */
private boolean lastRead;
/**
* @param msgFactory Message factory.
* @param protoVer Protocol version.
*/
public DirectMessageReader(final MessageFactory msgFactory, final byte protoVer) {
state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(msgFactory, protoVer);
}
});
}
/** {@inheritDoc} */
@Override public void setBuffer(ByteBuffer buf) {
state.item().stream.setBuffer(buf);
}
/** {@inheritDoc} */
@Override public void setCurrentReadClass(Class<? extends Message> msgCls) {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean beforeMessageRead() {
return true;
}
/** {@inheritDoc}
* @param msgCls*/
@Override public boolean afterMessageRead(Class<? extends Message> msgCls) {
return true;
}
/** {@inheritDoc} */
@Override public byte readByte(String name) {
DirectByteBufferStream stream = state.item().stream;
byte val = stream.readByte();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public short readShort(String name) {
DirectByteBufferStream stream = state.item().stream;
short val = stream.readShort();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public int readInt(String name) {
DirectByteBufferStream stream = state.item().stream;
int val = stream.readInt();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public int readInt(String name, int dflt) {
return readInt(name);
}
/** {@inheritDoc} */
@Override public long readLong(String name) {
DirectByteBufferStream stream = state.item().stream;
long val = stream.readLong();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public float readFloat(String name) {
DirectByteBufferStream stream = state.item().stream;
float val = stream.readFloat();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public double readDouble(String name) {
DirectByteBufferStream stream = state.item().stream;
double val = stream.readDouble();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public char readChar(String name) {
DirectByteBufferStream stream = state.item().stream;
char val = stream.readChar();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public boolean readBoolean(String name) {
DirectByteBufferStream stream = state.item().stream;
boolean val = stream.readBoolean();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Nullable @Override public byte[] readByteArray(String name) {
DirectByteBufferStream stream = state.item().stream;
byte[] arr = stream.readByteArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable @Override public short[] readShortArray(String name) {
DirectByteBufferStream stream = state.item().stream;
short[] arr = stream.readShortArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable @Override public int[] readIntArray(String name) {
DirectByteBufferStream stream = state.item().stream;
int[] arr = stream.readIntArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable @Override public long[] readLongArray(String name) {
DirectByteBufferStream stream = state.item().stream;
long[] arr = stream.readLongArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable @Override public float[] readFloatArray(String name) {
DirectByteBufferStream stream = state.item().stream;
float[] arr = stream.readFloatArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable @Override public double[] readDoubleArray(String name) {
DirectByteBufferStream stream = state.item().stream;
double[] arr = stream.readDoubleArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable @Override public char[] readCharArray(String name) {
DirectByteBufferStream stream = state.item().stream;
char[] arr = stream.readCharArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Nullable @Override public boolean[] readBooleanArray(String name) {
DirectByteBufferStream stream = state.item().stream;
boolean[] arr = stream.readBooleanArray();
lastRead = stream.lastFinished();
return arr;
}
/** {@inheritDoc} */
@Override public String readString(String name) {
DirectByteBufferStream stream = state.item().stream;
String val = stream.readString();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public BitSet readBitSet(String name) {
DirectByteBufferStream stream = state.item().stream;
BitSet val = stream.readBitSet();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public UUID readUuid(String name) {
DirectByteBufferStream stream = state.item().stream;
UUID val = stream.readUuid();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Override public IgniteUuid readIgniteUuid(String name) {
DirectByteBufferStream stream = state.item().stream;
IgniteUuid val = stream.readIgniteUuid();
lastRead = stream.lastFinished();
return val;
}
/** {@inheritDoc} */
@Nullable @Override public <T extends Message> T readMessage(String name) {
DirectByteBufferStream stream = state.item().stream;
T msg = stream.readMessage(this);
lastRead = stream.lastFinished();
return msg;
}
/** {@inheritDoc} */
@Override public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls) {
DirectByteBufferStream stream = state.item().stream;
T[] msg = stream.readObjectArray(itemType, itemCls, this);
lastRead = stream.lastFinished();
return msg;
}
/** {@inheritDoc} */
@Override public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
C col = stream.readCollection(itemType, this);
lastRead = stream.lastFinished();
return col;
}
/** {@inheritDoc} */
@Override public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType,
MessageCollectionItemType valType, boolean linked) {
DirectByteBufferStream stream = state.item().stream;
M map = stream.readMap(keyType, valType, linked, this);
lastRead = stream.lastFinished();
return map;
}
/** {@inheritDoc} */
@Override public boolean isLastRead() {
return lastRead;
}
/** {@inheritDoc} */
@Override public int state() {
return state.item().state;
}
/** {@inheritDoc} */
@Override public void incrementState() {
state.item().state++;
}
/** {@inheritDoc} */
@Override public void beforeInnerMessageRead() {
state.forward();
}
/** {@inheritDoc} */
@Override public void afterInnerMessageRead(boolean finished) {
state.backward(finished);
}
/** {@inheritDoc} */
@Override public void reset() {
state.reset();
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DirectMessageReader.class, this);
}
/**
*/
private static class StateItem implements DirectMessageStateItem {
/** Stream. */
private final DirectByteBufferStream stream;
/** State. */
private int state;
/**
* @param msgFactory Message factory.
* @param protoVer Protocol version.
*/
public StateItem(MessageFactory msgFactory, byte protoVer) {
switch (protoVer) {
case 1:
stream = new DirectByteBufferStreamImplV1(msgFactory);
break;
case 2:
stream = new DirectByteBufferStreamImplV2(msgFactory);
break;
default:
throw new IllegalStateException("Invalid protocol version: " + protoVer);
}
}
/** {@inheritDoc} */
@Override public void reset() {
state = 0;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(StateItem.class, this);
}
}
}