blob: c1b97d3a32b2265fa2dd0a05f6f4c0a69e010966 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto.decoder.primitive;
import io.airlift.slice.Slices;
import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.sql.presto.PulsarColumnHandle;
import org.apache.pulsar.sql.presto.PulsarRowDecoder;
import org.apache.pulsar.sql.presto.decoder.AbstractDecoderTester;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static io.prestosql.spi.type.DateType.DATE;
import static io.prestosql.spi.type.SmallintType.SMALLINT;
import static io.prestosql.spi.type.TimeType.TIME;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.VarcharType.VARCHAR;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.BooleanType.BOOLEAN;
import static io.prestosql.spi.type.DoubleType.DOUBLE;
import static io.prestosql.spi.type.IntegerType.INTEGER;
import static io.prestosql.spi.type.RealType.REAL;
import static org.apache.pulsar.sql.presto.TestPulsarConnector.getPulsarConnectorId;
public class TestPrimitiveDecoder extends AbstractDecoderTester {
public static final String PRIMITIVE_COLUMN_NAME = "__value__";
@BeforeMethod
public void init() {
decoderTestUtil = new PrimitiveDecoderTestUtil();
super.init();
}
@Test(singleThreaded = true)
public void testPrimitiveType() {
byte int8Value = 1;
SchemaInfo schemaInfoInt8 = SchemaInfoImpl.builder().type(SchemaType.INT8).build();
Schema schemaInt8 = Schema.getSchema(schemaInfoInt8);
List<PulsarColumnHandle> pulsarColumnHandleInt8 = getColumnColumnHandles(topicName, schemaInfoInt8, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderInt8 = decoderFactory.createRowDecoder(topicName, schemaInfoInt8,
new HashSet<>(pulsarColumnHandleInt8));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowInt8 =
pulsarRowDecoderInt8.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaInt8.encode(int8Value))).get();
checkValue(decodedRowInt8, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, TINYINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int8Value);
short int16Value = 2;
SchemaInfo schemaInfoInt16 = SchemaInfoImpl.builder().type(SchemaType.INT16).build();
Schema schemaInt16 = Schema.getSchema(schemaInfoInt16);
List<PulsarColumnHandle> pulsarColumnHandleInt16 = getColumnColumnHandles(topicName, schemaInfoInt16, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderInt16 = decoderFactory.createRowDecoder(topicName, schemaInfoInt16,
new HashSet<>(pulsarColumnHandleInt16));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowInt16 =
pulsarRowDecoderInt16.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaInt16.encode(int16Value))).get();
checkValue(decodedRowInt16, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, SMALLINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int16Value);
int int32Value = 2;
SchemaInfo schemaInfoInt32 = SchemaInfoImpl.builder().type(SchemaType.INT32).build();
Schema schemaInt32 = Schema.getSchema(schemaInfoInt32);
List<PulsarColumnHandle> pulsarColumnHandleInt32 = getColumnColumnHandles(topicName, schemaInfoInt32,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderInt32 = decoderFactory.createRowDecoder(topicName, schemaInfoInt32,
new HashSet<>(pulsarColumnHandleInt32));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowInt32 =
pulsarRowDecoderInt32.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaInt32.encode(int32Value))).get();
checkValue(decodedRowInt32, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, INTEGER, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int32Value);
long int64Value = 2;
SchemaInfo schemaInfoInt64 = SchemaInfoImpl.builder().type(SchemaType.INT64).build();
Schema schemaInt64 = Schema.getSchema(schemaInfoInt64);
List<PulsarColumnHandle> pulsarColumnHandleInt64 = getColumnColumnHandles(topicName, schemaInfoInt64,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderInt64 = decoderFactory.createRowDecoder(topicName, schemaInfoInt64,
new HashSet<>(pulsarColumnHandleInt64));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowInt64 =
pulsarRowDecoderInt64.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaInt64.encode(int64Value))).get();
checkValue(decodedRowInt64, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, BIGINT, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), int64Value);
String stringValue = "test";
SchemaInfo schemaInfoString = SchemaInfoImpl.builder().type(SchemaType.STRING).build();
Schema schemaString = Schema.getSchema(schemaInfoString);
List<PulsarColumnHandle> pulsarColumnHandleString = getColumnColumnHandles(topicName, schemaInfoString,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderString = decoderFactory.createRowDecoder(topicName, schemaInfoString,
new HashSet<>(pulsarColumnHandleString));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowString =
pulsarRowDecoderString.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaString.encode(stringValue))).get();
checkValue(decodedRowString, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, VARCHAR, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), stringValue);
float floatValue = 0.2f;
SchemaInfo schemaInfoFloat = SchemaInfoImpl.builder().type(SchemaType.FLOAT).build();
Schema schemaFloat = Schema.getSchema(schemaInfoFloat);
List<PulsarColumnHandle> pulsarColumnHandleFloat = getColumnColumnHandles(topicName, schemaInfoFloat,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderFloat = decoderFactory.createRowDecoder(topicName, schemaInfoFloat,
new HashSet<>(pulsarColumnHandleFloat));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowFloat =
pulsarRowDecoderFloat.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaFloat.encode(floatValue))).get();
checkValue(decodedRowFloat, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, REAL, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), Long.valueOf(Float.floatToIntBits(floatValue)));
double doubleValue = 0.22d;
SchemaInfo schemaInfoDouble = SchemaInfoImpl.builder().type(SchemaType.DOUBLE).build();
Schema schemaDouble = Schema.getSchema(schemaInfoDouble);
List<PulsarColumnHandle> pulsarColumnHandleDouble = getColumnColumnHandles(topicName, schemaInfoDouble,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderDouble = decoderFactory.createRowDecoder(topicName, schemaInfoDouble,
new HashSet<>(pulsarColumnHandleDouble));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowDouble =
pulsarRowDecoderDouble.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaDouble.encode(doubleValue))).get();
checkValue(decodedRowDouble, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, DOUBLE, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), doubleValue);
boolean booleanValue = true;
SchemaInfo schemaInfoBoolean = SchemaInfoImpl.builder().type(SchemaType.BOOLEAN).build();
Schema schemaBoolean = Schema.getSchema(schemaInfoBoolean);
List<PulsarColumnHandle> pulsarColumnHandleBoolean = getColumnColumnHandles(topicName, schemaInfoBoolean,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderBoolean = decoderFactory.createRowDecoder(topicName, schemaInfoBoolean,
new HashSet<>(pulsarColumnHandleBoolean));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowBoolean =
pulsarRowDecoderBoolean.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaBoolean.encode(booleanValue))).get();
checkValue(decodedRowBoolean, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, BOOLEAN, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), booleanValue);
byte[] bytesValue = new byte[1];
bytesValue[0] = 1;
SchemaInfo schemaInfoBytes = SchemaInfoImpl.builder().type(SchemaType.BYTES).build();
Schema schemaBytes = Schema.getSchema(schemaInfoBytes);
List<PulsarColumnHandle> pulsarColumnHandleBytes = getColumnColumnHandles(topicName, schemaInfoBytes,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderBytes = decoderFactory.createRowDecoder(topicName, schemaInfoBytes,
new HashSet<>(pulsarColumnHandleBytes));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowBytes =
pulsarRowDecoderBytes.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaBytes.encode(bytesValue))).get();
checkValue(decodedRowBytes, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, VARBINARY, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), Slices.wrappedBuffer(bytesValue));
Date dateValue = new Date(System.currentTimeMillis());
SchemaInfo schemaInfoDate = SchemaInfoImpl.builder().type(SchemaType.DATE).build();
Schema schemaDate = Schema.getSchema(schemaInfoDate);
List<PulsarColumnHandle> pulsarColumnHandleDate = getColumnColumnHandles(topicName, schemaInfoDate,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderDate = decoderFactory.createRowDecoder(topicName, schemaInfoDate,
new HashSet<>(pulsarColumnHandleDate));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowDate =
pulsarRowDecoderDate.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaDate.encode(dateValue))).get();
checkValue(decodedRowDate, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, DATE, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime());
Time timeValue = new Time(System.currentTimeMillis());
SchemaInfo schemaInfoTime = SchemaInfoImpl.builder().type(SchemaType.TIME).build();
Schema schemaTime = Schema.getSchema(schemaInfoTime);
List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderTime = decoderFactory.createRowDecoder(topicName, schemaInfoTime,
new HashSet<>(pulsarColumnHandleTime));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowTime =
pulsarRowDecoderTime.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaTime.encode(timeValue))).get();
checkValue(decodedRowTime, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, TIME, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime());
Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
SchemaInfo schemaInfoTimestamp = SchemaInfoImpl.builder().type(SchemaType.TIMESTAMP).build();
Schema schemaTimestamp = Schema.getSchema(schemaInfoTimestamp);
List<PulsarColumnHandle> pulsarColumnHandleTimestamp = getColumnColumnHandles(topicName, schemaInfoTimestamp,
PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
PulsarRowDecoder pulsarRowDecoderTimestamp = decoderFactory.createRowDecoder(topicName, schemaInfoTimestamp,
new HashSet<>(pulsarColumnHandleTimestamp));
Map<DecoderColumnHandle, FieldValueProvider> decodedRowTimestamp =
pulsarRowDecoderTimestamp.decodeRow(io.netty.buffer.Unpooled
.copiedBuffer(schemaTimestamp.encode(timestampValue))).get();
checkValue(decodedRowTimestamp, new PulsarColumnHandle(getPulsarConnectorId().toString(),
PRIMITIVE_COLUMN_NAME, TIMESTAMP, false, false, PRIMITIVE_COLUMN_NAME, null, null,
PulsarColumnHandle.HandleKeyValueType.NONE), timestampValue.getTime());
}
}