blob: feb848d61348bb025fb48f6cf59f7698189b4c39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.proto;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.BitSet;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteUuid;
import org.msgpack.core.ExtensionTypeHeader;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageSizeException;
import org.msgpack.core.MessageTypeException;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.core.buffer.InputStreamBufferInput;
import org.msgpack.value.ImmutableValue;
import static org.apache.ignite.internal.client.proto.ClientDataType.BIGINTEGER;
import static org.apache.ignite.internal.client.proto.ClientDataType.BITMASK;
import static org.apache.ignite.internal.client.proto.ClientDataType.BOOLEAN;
import static org.apache.ignite.internal.client.proto.ClientDataType.BYTES;
import static org.apache.ignite.internal.client.proto.ClientDataType.DATE;
import static org.apache.ignite.internal.client.proto.ClientDataType.DATETIME;
import static org.apache.ignite.internal.client.proto.ClientDataType.DECIMAL;
import static org.apache.ignite.internal.client.proto.ClientDataType.DOUBLE;
import static org.apache.ignite.internal.client.proto.ClientDataType.FLOAT;
import static org.apache.ignite.internal.client.proto.ClientDataType.INT16;
import static org.apache.ignite.internal.client.proto.ClientDataType.INT32;
import static org.apache.ignite.internal.client.proto.ClientDataType.INT64;
import static org.apache.ignite.internal.client.proto.ClientDataType.INT8;
import static org.apache.ignite.internal.client.proto.ClientDataType.NUMBER;
import static org.apache.ignite.internal.client.proto.ClientDataType.STRING;
import static org.apache.ignite.internal.client.proto.ClientDataType.TIME;
import static org.apache.ignite.internal.client.proto.ClientDataType.TIMESTAMP;
/**
* Ignite-specific MsgPack extension based on Netty ByteBuf.
* <p>
* Releases wrapped buffer on {@link #close()} .
*/
public class ClientMessageUnpacker extends MessageUnpacker {
/** Underlying buffer. */
private final ByteBuf buf;
/** Underlying input. */
private final InputStreamBufferInput in;
/** Ref count. */
private int refCnt = 1;
/**
* Constructor.
*
* @param buf Input.
*/
public ClientMessageUnpacker(ByteBuf buf) {
// TODO: Remove intermediate classes and buffers IGNITE-15234.
this(new InputStreamBufferInput(new ByteBufInputStream(buf)), buf);
}
private ClientMessageUnpacker(InputStreamBufferInput in, ByteBuf buf) {
super(in, MessagePack.DEFAULT_UNPACKER_CONFIG);
this.in = in;
this.buf = buf;
}
/** {@inheritDoc} */
@Override public int unpackInt() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackInt();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public String unpackString() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public void unpackNil() {
assert refCnt > 0 : "Unpacker is closed";
try {
super.unpackNil();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public boolean unpackBoolean() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackBoolean();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public byte unpackByte() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackByte();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public short unpackShort() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackShort();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public long unpackLong() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackLong();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public BigInteger unpackBigInteger() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackBigInteger();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public float unpackFloat() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackFloat();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public double unpackDouble() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackDouble();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public int unpackArrayHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackArrayHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public int unpackMapHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackMapHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public ExtensionTypeHeader unpackExtensionTypeHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackExtensionTypeHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public int unpackBinaryHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackBinaryHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public boolean tryUnpackNil() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.tryUnpackNil();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public byte[] readPayload(int length) {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.readPayload(length);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public MessageFormat getNextFormat() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.getNextFormat();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public void skipValue(int count) {
assert refCnt > 0 : "Unpacker is closed";
try {
super.skipValue(count);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public void skipValue() {
assert refCnt > 0 : "Unpacker is closed";
try {
super.skipValue();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.hasNext();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public ImmutableValue unpackValue() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackValue();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/**
* Reads an UUID.
*
* @return UUID value.
* @throws MessageTypeException when type is not UUID.
* @throws MessageSizeException when size is not correct.
*/
public UUID unpackUuid() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.UUID)
throw new MessageTypeException("Expected UUID extension (3), but got " + type);
if (len != 16)
throw new MessageSizeException("Expected 16 bytes for UUID extension, but got " + len, len);
var bytes = readPayload(16);
ByteBuffer bb = ByteBuffer.wrap(bytes);
return new UUID(bb.getLong(), bb.getLong());
}
/**
* Reads an {@link IgniteUuid}.
*
* @return {@link IgniteUuid} value.
* @throws MessageTypeException when type is not {@link IgniteUuid}.
* @throws MessageSizeException when size is not correct.
*/
public IgniteUuid unpackIgniteUuid() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.IGNITE_UUID)
throw new MessageTypeException("Expected Ignite UUID extension (1), but got " + type);
if (len != 24)
throw new MessageSizeException("Expected 24 bytes for UUID extension, but got " + len, len);
var bytes = readPayload(24);
ByteBuffer bb = ByteBuffer.wrap(bytes);
return new IgniteUuid(new UUID(bb.getLong(), bb.getLong()), bb.getLong());
}
/**
* Reads a decimal.
*
* @return Decimal value.
* @throws MessageTypeException when type is not Decimal.
*/
public BigDecimal unpackDecimal() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.DECIMAL)
throw new MessageTypeException("Expected DECIMAL extension (2), but got " + type);
var bytes = readPayload(len);
ByteBuffer bb = ByteBuffer.wrap(bytes);
int scale = bb.getInt();
return new BigDecimal(new BigInteger(bytes, bb.position(), bb.remaining()), scale);
}
/**
* Reads a bit set.
*
* @return Bit set.
* @throws MessageTypeException when type is not BitSet.
*/
public BitSet unpackBitSet() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.BITMASK)
throw new MessageTypeException("Expected BITSET extension (7), but got " + type);
var bytes = readPayload(len);
return BitSet.valueOf(bytes);
}
/**
* Reads a number.
*
* @return BigInteger value.
* @throws MessageTypeException when type is not BigInteger.
*/
public BigInteger unpackNumber() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.NUMBER)
throw new MessageTypeException("Expected NUMBER extension (1), but got " + type);
var bytes = readPayload(len);
return new BigInteger(bytes);
}
/**
* Reads an integer array.
*
* @return Integer array.
*/
public int[] unpackIntArray() {
assert refCnt > 0 : "Unpacker is closed";
int size = unpackArrayHeader();
if (size == 0)
return ArrayUtils.INT_EMPTY_ARRAY;
int[] res = new int[size];
for (int i = 0; i < size; i++)
res[i] = unpackInt();
return res;
}
/**
* Reads a date.
*
* @return Date value.
* @throws MessageTypeException when type is not DATE.
* @throws MessageSizeException when size is not correct.
*/
public LocalDate unpackDate() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.DATE)
throw new MessageTypeException("Expected DATE extension (4), but got " + type);
if (len != 6)
throw new MessageSizeException("Expected 6 bytes for DATE extension, but got " + len, len);
var data = ByteBuffer.wrap(readPayload(len));
return LocalDate.of(data.getInt(), data.get(), data.get());
}
/**
* Reads a time.
*
* @return Time value.
* @throws MessageTypeException when type is not TIME.
* @throws MessageSizeException when size is not correct.
*/
public LocalTime unpackTime() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.TIME)
throw new MessageTypeException("Expected TIME extension (5), but got " + type);
if (len != 7)
throw new MessageSizeException("Expected 7 bytes for TIME extension, but got " + len, len);
var data = ByteBuffer.wrap(readPayload(len));
return LocalTime.of(data.get(), data.get(), data.get(), data.getInt());
}
/**
* Reads a datetime.
*
* @return Datetime value.
* @throws MessageTypeException when type is not DATETIME.
* @throws MessageSizeException when size is not correct.
*/
public LocalDateTime unpackDateTime() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.DATETIME)
throw new MessageTypeException("Expected DATETIME extension (6), but got " + type);
if (len != 13)
throw new MessageSizeException("Expected 13 bytes for DATETIME extension, but got " + len, len);
var data = ByteBuffer.wrap(readPayload(len));
return LocalDateTime.of(
LocalDate.of(data.getInt(), data.get(), data.get()),
LocalTime.of(data.get(), data.get(), data.get(), data.getInt())
);
}
/**
* Reads a timestamp.
*
* @return Timestamp value.
* @throws MessageTypeException when type is not TIMESTAMP.
* @throws MessageSizeException when size is not correct.
*/
public Instant unpackTimestamp() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.TIMESTAMP)
throw new MessageTypeException("Expected TIMESTAMP extension (6), but got " + type);
if (len != 12)
throw new MessageSizeException("Expected 12 bytes for TIMESTAMP extension, but got " + len, len);
var data = ByteBuffer.wrap(readPayload(len));
return Instant.ofEpochSecond(data.getLong(), data.getInt());
}
/**
* Unpacks an object based on the specified type.
*
* @param dataType Data type code.
*
* @return Unpacked object.
* @throws IgniteException when data type is not valid.
*/
public Object unpackObject(int dataType) {
if (tryUnpackNil())
return null;
switch (dataType) {
case BOOLEAN:
return unpackBoolean();
case INT8:
return unpackByte();
case INT16:
return unpackShort();
case INT32:
return unpackInt();
case INT64:
return unpackLong();
case FLOAT:
return unpackFloat();
case DOUBLE:
return unpackDouble();
case ClientDataType.UUID:
return unpackUuid();
case STRING:
return unpackString();
case BYTES: {
var cnt = unpackBinaryHeader();
return readPayload(cnt);
}
case DECIMAL:
return unpackDecimal();
case BIGINTEGER:
return unpackBigInteger();
case BITMASK:
return unpackBitSet();
case NUMBER:
return unpackNumber();
case DATE:
return unpackDate();
case TIME:
return unpackTime();
case DATETIME:
return unpackDateTime();
case TIMESTAMP:
return unpackTimestamp();
}
throw new IgniteException("Unknown client data type: " + dataType);
}
/**
* Packs an object.
*
* @return Object array.
* @throws IllegalStateException in case of unexpected value type.
*/
public Object[] unpackObjectArray() {
assert refCnt > 0 : "Unpacker is closed";
if (tryUnpackNil())
return null;
int size = unpackArrayHeader();
if (size == 0)
return ArrayUtils.OBJECT_EMPTY_ARRAY;
Object[] args = new Object[size];
for (int i = 0; i < size; i++) {
if (tryUnpackNil())
continue;
args[i] = unpackObject(unpackInt());
}
return args;
}
/**
* Creates a copy of this unpacker and the underlying buffer.
*
* @return Copied unpacker.
* @throws UncheckedIOException When buffer operation fails.
*/
public ClientMessageUnpacker copy() {
try {
in.reset(new ByteBufInputStream(buf.copy()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return this;
}
/**
* Increases the reference count by {@code 1}.
*
* @return This instance.
*/
public ClientMessageUnpacker retain() {
refCnt++;
buf.retain();
return this;
}
/** {@inheritDoc} */
@Override public void close() {
if (refCnt == 0)
return;
refCnt--;
if (buf.refCnt() > 0)
buf.release();
}
}