| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.client.proto; |
| |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.nio.ByteBuffer; |
| import java.sql.Date; |
| import java.sql.Time; |
| import java.sql.Timestamp; |
| import java.time.Instant; |
| import java.time.LocalDate; |
| import java.time.LocalDateTime; |
| import java.time.LocalTime; |
| import java.util.BitSet; |
| import java.util.UUID; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.ByteBufOutputStream; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.msgpack.core.MessagePack; |
| import org.msgpack.core.MessagePacker; |
| import org.msgpack.core.buffer.OutputStreamBufferOutput; |
| import org.msgpack.value.Value; |
| |
| import static org.apache.ignite.internal.client.proto.ClientMessageCommon.HEADER_SIZE; |
| |
| /** |
| * Ignite-specific MsgPack extension based on Netty ByteBuf. |
| * <p> |
| * Releases wrapped buffer on {@link #close()} . |
| */ |
| public class ClientMessagePacker extends MessagePacker { |
| /** Underlying buffer. */ |
| private final ByteBuf buf; |
| |
| /** Closed flag. */ |
| private boolean closed = false; |
| |
| /** |
| * Constructor. |
| * |
| * @param buf Buffer. |
| */ |
| public ClientMessagePacker(ByteBuf buf) { |
| // TODO: Remove intermediate classes and buffers IGNITE-15234. |
| // Reserve 4 bytes for the message length. |
| super(new OutputStreamBufferOutput(new ByteBufOutputStream(buf.writerIndex(HEADER_SIZE))), |
| MessagePack.DEFAULT_PACKER_CONFIG); |
| |
| this.buf = buf; |
| } |
| |
| /** |
| * Gets the underlying buffer. |
| * |
| * @return Underlying buffer. |
| * @throws UncheckedIOException When flush fails. |
| */ |
| public ByteBuf getBuffer() { |
| try { |
| flush(); |
| } |
| catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| |
| buf.setInt(0, buf.writerIndex() - HEADER_SIZE); |
| |
| return buf; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packNil() { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packNil(); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packBoolean(boolean b) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packBoolean(b); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packByte(byte b) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packByte(b); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packShort(short v) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packShort(v); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packInt(int r) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packInt(r); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packLong(long v) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packLong(v); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packBigInteger(BigInteger bi) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packBigInteger(bi); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packFloat(float v) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packFloat(v); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packDouble(double v) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packDouble(v); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packString(String s) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packString(s); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packArrayHeader(int arraySize) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packArrayHeader(arraySize); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packMapHeader(int mapSize) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packMapHeader(mapSize); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packValue(Value v) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packValue(v); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packExtensionTypeHeader(byte extType, int payloadLen) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packExtensionTypeHeader(extType, payloadLen); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packBinaryHeader(int len) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packBinaryHeader(len); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker packRawStringHeader(int len) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.packRawStringHeader(len); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker writePayload(byte[] src) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.writePayload(src); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker writePayload(byte[] src, int off, int len) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.writePayload(src, off, len); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker addPayload(byte[] src) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.addPayload(src); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MessagePacker addPayload(byte[] src, int off, int len) { |
| assert !closed : "Packer is closed"; |
| |
| try { |
| return super.addPayload(src, off, len); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /** |
| * Writes an UUID. |
| * |
| * @param val UUID value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packUuid(UUID val) { |
| assert !closed : "Packer is closed"; |
| |
| packExtensionTypeHeader(ClientMsgPackType.UUID, 16); |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| var bytes = new byte[16]; |
| ByteBuffer bb = ByteBuffer.wrap(bytes); |
| |
| bb.putLong(val.getMostSignificantBits()); |
| bb.putLong(val.getLeastSignificantBits()); |
| |
| addPayload(bytes); |
| |
| return this; |
| } |
| |
| /** |
| * Writes an {@link IgniteUuid}. |
| * |
| * @param val {@link IgniteUuid} value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packIgniteUuid(IgniteUuid val) { |
| assert !closed : "Packer is closed"; |
| |
| packExtensionTypeHeader(ClientMsgPackType.IGNITE_UUID, 24); |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| var bytes = new byte[24]; |
| ByteBuffer bb = ByteBuffer.wrap(bytes); |
| |
| UUID globalId = val.globalId(); |
| |
| bb.putLong(globalId.getMostSignificantBits()); |
| bb.putLong(globalId.getLeastSignificantBits()); |
| |
| bb.putLong(val.localId()); |
| |
| writePayload(bytes); |
| |
| return this; |
| } |
| |
| /** |
| * Writes a decimal. |
| * |
| * @param val Decimal value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packDecimal(BigDecimal val) { |
| assert !closed : "Packer is closed"; |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| byte[] unscaledValue = val.unscaledValue().toByteArray(); |
| |
| packExtensionTypeHeader(ClientMsgPackType.DECIMAL, 4 + unscaledValue.length); // Scale length + data length |
| |
| addPayload(ByteBuffer.wrap(new byte[4]).putInt(val.scale()).array()); |
| addPayload(unscaledValue); |
| |
| return this; |
| } |
| |
| /** |
| * Writes a decimal. |
| * |
| * @param val Decimal value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packNumber(BigInteger val) { |
| assert !closed : "Packer is closed"; |
| |
| byte[] data = val.toByteArray(); |
| |
| packExtensionTypeHeader(ClientMsgPackType.NUMBER, data.length); |
| |
| addPayload(data); |
| |
| return this; |
| } |
| |
| /** |
| * Writes a bit set. |
| * |
| * @param val Bit set value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packBitSet(BitSet val) { |
| assert !closed : "Packer is closed"; |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| byte[] data = val.toByteArray(); |
| |
| packExtensionTypeHeader(ClientMsgPackType.BITMASK, data.length); |
| |
| addPayload(data); |
| |
| return this; |
| } |
| |
| /** |
| * Writes an integer array. |
| * |
| * @param arr Integer array value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packIntArray(int[] arr) { |
| assert !closed : "Packer is closed"; |
| |
| if (arr == null) { |
| packNil(); |
| |
| return this; |
| } |
| |
| packArrayHeader(arr.length); |
| |
| for (int i : arr) |
| packInt(i); |
| |
| return this; |
| } |
| |
| /** |
| * Writes a date. |
| * |
| * @param val Date value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packDate(LocalDate val) { |
| assert !closed : "Packer is closed"; |
| |
| byte[] data = new byte[6]; |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| ByteBuffer.wrap(data) |
| .putInt(val.getYear()) |
| .put((byte)val.getMonthValue()) |
| .put((byte)val.getDayOfMonth()); |
| |
| packExtensionTypeHeader(ClientMsgPackType.DATE, data.length); |
| |
| addPayload(data); |
| |
| return this; |
| } |
| |
| /** |
| * Writes a time. |
| * |
| * @param val Time value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packTime(LocalTime val) { |
| assert !closed : "Packer is closed"; |
| |
| byte[] data = new byte[7]; |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| ByteBuffer.wrap(data) |
| .put((byte)val.getHour()) |
| .put((byte)val.getMinute()) |
| .put((byte)val.getSecond()) |
| .putInt(val.getNano()); |
| |
| packExtensionTypeHeader(ClientMsgPackType.TIME, data.length); |
| |
| addPayload(data); |
| |
| return this; |
| } |
| |
| /** |
| * Writes a datetime. |
| * |
| * @param val Datetime value. |
| * @return This instance. |
| */ |
| public ClientMessagePacker packDateTime(LocalDateTime val) { |
| assert !closed : "Packer is closed"; |
| |
| byte[] data = new byte[13]; |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| ByteBuffer.wrap(data) |
| .putInt(val.getYear()) |
| .put((byte)val.getMonthValue()) |
| .put((byte)val.getDayOfMonth()) |
| .put((byte)val.getHour()) |
| .put((byte)val.getMinute()) |
| .put((byte)val.getSecond()) |
| .putInt(val.getNano()); |
| |
| packExtensionTypeHeader(ClientMsgPackType.DATETIME, data.length); |
| |
| addPayload(data); |
| |
| return this; |
| } |
| |
| /** |
| * Writes a timestamp. |
| * |
| * @param val Timestamp value. |
| * @return This instance. |
| * @throws UnsupportedOperationException Not supported. |
| */ |
| public ClientMessagePacker packTimestamp(Instant val) { |
| assert !closed : "Packer is closed"; |
| |
| byte[] data = new byte[12]; |
| |
| // TODO: Pack directly to ByteBuf without allocating IGNITE-15234. |
| ByteBuffer.wrap(data) |
| .putLong(val.getEpochSecond()) |
| .putInt(val.getNano()); |
| |
| packExtensionTypeHeader(ClientMsgPackType.TIMESTAMP, data.length); |
| |
| addPayload(data); |
| |
| return this; |
| } |
| |
| /** |
| * Packs an object. |
| * |
| * @param val Object value. |
| * @return This instance. |
| * @throws UnsupportedOperationException When type is not supported. |
| */ |
| public ClientMessagePacker packObject(Object val) { |
| if (val == null) |
| return (ClientMessagePacker)packNil(); |
| |
| if (val instanceof Byte) |
| return (ClientMessagePacker)packByte((byte)val); |
| |
| if (val instanceof Short) |
| return (ClientMessagePacker)packShort((short)val); |
| |
| if (val instanceof Integer) |
| return (ClientMessagePacker)packInt((int)val); |
| |
| if (val instanceof Long) |
| return (ClientMessagePacker)packLong((long)val); |
| |
| if (val instanceof Float) |
| return (ClientMessagePacker)packFloat((float)val); |
| |
| if (val instanceof Double) |
| return (ClientMessagePacker)packDouble((double)val); |
| |
| if (val instanceof UUID) |
| return packUuid((UUID)val); |
| |
| if (val instanceof String) |
| return (ClientMessagePacker)packString((String)val); |
| |
| if (val instanceof byte[]) { |
| byte[] bytes = (byte[])val; |
| packBinaryHeader(bytes.length); |
| writePayload(bytes); |
| |
| return this; |
| } |
| |
| if (val instanceof BigDecimal) |
| return packDecimal((BigDecimal)val); |
| |
| if (val instanceof BigInteger) |
| return packNumber((BigInteger)val); |
| |
| if (val instanceof BitSet) |
| return packBitSet((BitSet)val); |
| |
| if (val instanceof LocalDate) |
| return packDate((LocalDate)val); |
| |
| if (val instanceof LocalTime) |
| return packTime((LocalTime)val); |
| |
| if (val instanceof LocalDateTime) |
| return packDateTime((LocalDateTime)val); |
| |
| if (val instanceof Instant) |
| return packTimestamp((Instant)val); |
| |
| throw new UnsupportedOperationException("Unsupported type, can't serialize: " + val.getClass()); |
| } |
| |
| /** |
| * Packs an array of different objects. |
| * |
| * @param args Object array. |
| * @return This instance. |
| * @throws UnsupportedOperationException in case of unknown type. |
| */ |
| public ClientMessagePacker packObjectArray(Object[] args) { |
| assert !closed : "Packer is closed"; |
| |
| if (args == null) { |
| packNil(); |
| |
| return this; |
| } |
| |
| packArrayHeader(args.length); |
| |
| for (Object arg : args) { |
| if (arg == null) { |
| packNil(); |
| |
| continue; |
| } |
| |
| Class<?> cls = arg.getClass(); |
| |
| if (cls == Boolean.class) { |
| packInt(ClientDataType.BOOLEAN); |
| packBoolean((Boolean)arg); |
| } |
| else if (cls == Byte.class) { |
| packInt(ClientDataType.INT8); |
| packByte((Byte)arg); |
| } |
| else if (cls == Short.class) { |
| packInt(ClientDataType.INT16); |
| packShort((Short)arg); |
| } |
| else if (cls == Integer.class) { |
| packInt(ClientDataType.INT32); |
| packInt((Integer)arg); |
| } |
| else if (cls == Long.class) { |
| packInt(ClientDataType.INT64); |
| packLong((Long)arg); |
| } |
| else if (cls == Float.class) { |
| packInt(ClientDataType.FLOAT); |
| packFloat((Float)arg); |
| } |
| else if (cls == Double.class) { |
| packInt(ClientDataType.DOUBLE); |
| packDouble((Double)arg); |
| } |
| else if (cls == String.class) { |
| packInt(ClientDataType.STRING); |
| packString((String)arg); |
| } |
| else if (cls == UUID.class) { |
| packInt(ClientDataType.UUID); |
| packUuid((UUID)arg); |
| } |
| else if (cls == LocalDate.class) { |
| packInt(ClientDataType.DATE); |
| packDate((LocalDate)arg); |
| } |
| else if (cls == LocalTime.class) { |
| packInt(ClientDataType.TIME); |
| packTime((LocalTime)arg); |
| } |
| else if (cls == LocalDateTime.class) { |
| packInt(ClientDataType.DATETIME); |
| packDateTime((LocalDateTime)arg); |
| } |
| else if (cls == Instant.class) { |
| packInt(ClientDataType.TIMESTAMP); |
| packTimestamp((Instant)arg); |
| } |
| else if (cls == byte[].class) { |
| packInt(ClientDataType.BYTES); |
| |
| packBinaryHeader(((byte[])arg).length); |
| writePayload((byte[])arg); |
| } |
| else if (cls == Date.class) { |
| packInt(ClientDataType.DATE); |
| packDate(((Date)arg).toLocalDate()); |
| } |
| else if (cls == Time.class) { |
| packInt(ClientDataType.TIME); |
| packTime(((Time)arg).toLocalTime()); |
| } |
| else if (cls == Timestamp.class) { |
| packInt(ClientDataType.TIMESTAMP); |
| packTimestamp(((java.util.Date)arg).toInstant()); |
| } |
| else |
| throw new UnsupportedOperationException("Custom objects are not supported"); |
| } |
| |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() { |
| if (closed) |
| return; |
| |
| closed = true; |
| |
| if (buf.refCnt() > 0) |
| buf.release(); |
| } |
| } |