blob: a058c9c23617f5866e57d1649fa7c9bcc67ab0f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client.proto;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.apache.ignite.lang.IgniteException;
import org.msgpack.core.ExtensionTypeHeader;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageSizeException;
import org.msgpack.core.MessageTypeException;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.core.buffer.InputStreamBufferInput;
import org.msgpack.value.ImmutableValue;
import static org.apache.ignite.client.proto.ClientDataType.BITMASK;
import static org.apache.ignite.client.proto.ClientDataType.BYTES;
import static org.apache.ignite.client.proto.ClientDataType.DECIMAL;
import static org.apache.ignite.client.proto.ClientDataType.DOUBLE;
import static org.apache.ignite.client.proto.ClientDataType.FLOAT;
import static org.apache.ignite.client.proto.ClientDataType.INT16;
import static org.apache.ignite.client.proto.ClientDataType.INT32;
import static org.apache.ignite.client.proto.ClientDataType.INT64;
import static org.apache.ignite.client.proto.ClientDataType.INT8;
import static org.apache.ignite.client.proto.ClientDataType.STRING;
/**
* Ignite-specific MsgPack extension based on Netty ByteBuf.
* <p>
* Releases wrapped buffer on {@link #close()} .
*/
public class ClientMessageUnpacker extends MessageUnpacker {
/** Underlying buffer. */
private final ByteBuf buf;
/** Underlying input. */
private final InputStreamBufferInput in;
/** Ref count. */
private int refCnt = 1;
/**
* Constructor.
*
* @param buf Input.
*/
public ClientMessageUnpacker(ByteBuf buf) {
// TODO: Remove intermediate classes and buffers IGNITE-15234.
this(new InputStreamBufferInput(new ByteBufInputStream(buf)), buf);
}
private ClientMessageUnpacker(InputStreamBufferInput in, ByteBuf buf) {
super(in, MessagePack.DEFAULT_UNPACKER_CONFIG);
this.in = in;
this.buf = buf;
}
/** {@inheritDoc} */
@Override public int unpackInt() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackInt();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public String unpackString() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackString();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public void unpackNil() {
assert refCnt > 0 : "Unpacker is closed";
try {
super.unpackNil();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public boolean unpackBoolean() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackBoolean();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public byte unpackByte() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackByte();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public short unpackShort() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackShort();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public long unpackLong() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackLong();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public BigInteger unpackBigInteger() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackBigInteger();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public float unpackFloat() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackFloat();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public double unpackDouble() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackDouble();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public int unpackArrayHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackArrayHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public int unpackMapHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackMapHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public ExtensionTypeHeader unpackExtensionTypeHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackExtensionTypeHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public int unpackBinaryHeader() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackBinaryHeader();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public boolean tryUnpackNil() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.tryUnpackNil();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public byte[] readPayload(int length) {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.readPayload(length);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public MessageFormat getNextFormat() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.getNextFormat();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public void skipValue(int count) {
assert refCnt > 0 : "Unpacker is closed";
try {
super.skipValue(count);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public void skipValue() {
assert refCnt > 0 : "Unpacker is closed";
try {
super.skipValue();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.hasNext();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/** {@inheritDoc} */
@Override public ImmutableValue unpackValue() {
assert refCnt > 0 : "Unpacker is closed";
try {
return super.unpackValue();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
/**
* Reads an UUID.
*
* @return UUID value.
* @throws MessageTypeException when type is not UUID.
* @throws MessageSizeException when size is not correct.
*/
public UUID unpackUuid() {
assert refCnt > 0 : "Unpacker is closed";
var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();
var len = hdr.getLength();
if (type != ClientMsgPackType.UUID)
throw new MessageTypeException("Expected UUID extension (1), but got " + type);
if (len != 16)
throw new MessageSizeException("Expected 16 bytes for UUID extension, but got " + len, len);
var bytes = readPayload(16);
ByteBuffer bb = ByteBuffer.wrap(bytes);
return new UUID(bb.getLong(), bb.getLong());
}
/**
* Reads a decimal.
*
* @return Decimal value.
* @throws UnsupportedOperationException Not supported yet.
*/
public BigDecimal unpackDecimal() {
assert refCnt > 0 : "Unpacker is closed";
throw new UnsupportedOperationException("TODO: IGNITE-15163");
}
/**
* Reads a bit set.
*
* @return Bit set.
* @throws UnsupportedOperationException Not supported yet.
*/
public BitSet unpackBitSet() {
assert refCnt > 0 : "Unpacker is closed";
throw new UnsupportedOperationException("TODO: IGNITE-15163");
}
/**
* Unpacks an object based on the specified type.
*
* @param dataType Data type code.
*
* @return Unpacked object.
* @throws IgniteException when data type is not valid.
*/
public Object unpackObject(int dataType) {
if (tryUnpackNil())
return null;
switch (dataType) {
case INT8:
return unpackByte();
case INT16:
return unpackShort();
case INT32:
return unpackInt();
case INT64:
return unpackLong();
case FLOAT:
return unpackFloat();
case DOUBLE:
return unpackDouble();
case ClientDataType.UUID:
return unpackUuid();
case STRING:
return unpackString();
case BYTES:
var cnt = unpackBinaryHeader();
return readPayload(cnt);
case DECIMAL:
return unpackDecimal();
case BITMASK:
return unpackBitSet();
}
throw new IgniteException("Unknown client data type: " + dataType);
}
/**
* Creates a copy of this unpacker and the underlying buffer.
*
* @return Copied unpacker.
* @throws UncheckedIOException When buffer operation fails.
*/
public ClientMessageUnpacker copy() {
try {
in.reset(new ByteBufInputStream(buf.copy()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return this;
}
/**
* Increases the reference count by {@code 1}.
*
* @return This instance.
*/
public ClientMessageUnpacker retain() {
refCnt++;
buf.retain();
return this;
}
/** {@inheritDoc} */
@Override public void close() {
if (refCnt == 0)
return;
refCnt--;
if (buf.refCnt() > 0)
buf.release();
}
}