blob: 1fe881ee3999e7e90db4d87c5e2cd0d581ae2655 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.prestosql.spi.connector.ColumnMetadata;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.api.raw.RawMessageImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
/**
* Unit test for KeyValueSchemaHandler
*/
@Slf4j
public class TestPulsarKeyValueSchemaHandler {
private final static ObjectMapper objectMapper = new ObjectMapper();
private Schema<KeyValue<String, Integer>> schema1 =
Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE);
private Schema<KeyValue<String, Foo>> schema2 =
Schema.KeyValue(Schema.STRING, Schema.JSON(Foo.class), KeyValueEncodingType.INLINE);
private Schema<KeyValue<Boo, Long>> schema3 =
Schema.KeyValue(Schema.AVRO(Boo.class), Schema.INT64, KeyValueEncodingType.SEPARATED);
private Schema<KeyValue<Boo, Foo>> schema4 =
Schema.KeyValue(Schema.JSON(Boo.class), Schema.AVRO(Foo.class), KeyValueEncodingType.SEPARATED);
private final static TopicName topicName = TopicName.get("persistent://public/default/kv-test");
private final static Foo foo;
private final static Boo boo;
private final Integer KEY_FIELD_NAME_PREFIX_LENGTH = PulsarColumnMetadata.KEY_SCHEMA_COLUMN_PREFIX.length();
static {
foo = new Foo();
foo.field1 = "field1-value";
foo.field2 = 20;
boo = new Boo();
boo.field1 = "field1-value";
boo.field2 = true;
boo.field3 = 10.2;
}
@Test
public void testSchema1() throws IOException {
final String keyData = "test-key";
final Integer valueData = 10;
List<ColumnMetadata> columnMetadataList =
PulsarMetadata.getPulsarColumns(topicName, schema1.getSchemaInfo(),
true, null);
int keyCount = 0;
int valueCount = 0;
for (ColumnMetadata columnMetadata : columnMetadataList) {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
if (pulsarColumnMetadata.isKey()) {
keyCount++;
} else if (pulsarColumnMetadata.isValue()) {
valueCount++;
}
}
Assert.assertEquals(keyCount, 1);
Assert.assertEquals(valueCount, 1);
List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
KeyValueSchemaHandler keyValueSchemaHandler =
new KeyValueSchemaHandler(null, null,schema1.getSchemaInfo(), columnHandleList);
RawMessageImpl message = mock(RawMessageImpl.class);
Mockito.when(message.getData()).thenReturn(
Unpooled.wrappedBuffer(schema1.encode(new KeyValue<>(keyData, valueData)))
);
KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema1);
Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
Assert.assertEquals(keyValueSchemaHandler.extractField(0, object), keyData);
Assert.assertEquals(keyValueSchemaHandler.extractField(1, object), valueData);
}
@Test
public void testSchema2() throws IOException {
final String keyData = "test-key";
List<ColumnMetadata> columnMetadataList =
PulsarMetadata.getPulsarColumns(topicName, schema2.getSchemaInfo(),
true, null);
int keyCount = 0;
int valueCount = 0;
for (ColumnMetadata columnMetadata : columnMetadataList) {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
if (pulsarColumnMetadata.isKey()) {
keyCount++;
} else if (pulsarColumnMetadata.isValue()) {
valueCount++;
}
}
Assert.assertEquals(keyCount, 1);
Assert.assertEquals(valueCount, 2);
List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
RawMessage message = mock(RawMessage.class);
Mockito.when(message.getData()).thenReturn(
Unpooled.wrappedBuffer(schema2.encode(new KeyValue<>(keyData, foo)))
);
KeyValueSchemaHandler keyValueSchemaHandler =
new KeyValueSchemaHandler(null, null, schema2.getSchemaInfo(), columnHandleList);
KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema2);
Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
Assert.assertEquals(keyValueSchemaHandler.extractField(0, object), keyData);
Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
foo.getValue(columnHandleList.get(1).getName()));
Assert.assertEquals(keyValueSchemaHandler.extractField(2, object),
foo.getValue(columnHandleList.get(2).getName()));
}
@Test
public void testSchema3() throws IOException {
final Boo boo = new Boo();
boo.field1 = "field1-value";
boo.field2 = true;
boo.field3 = 10.2;
final Long valueData = 999999L;
List<ColumnMetadata> columnMetadataList =
PulsarMetadata.getPulsarColumns(topicName, schema3.getSchemaInfo(),
true, null);
int keyCount = 0;
int valueCount = 0;
for (ColumnMetadata columnMetadata : columnMetadataList) {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
if (pulsarColumnMetadata.isKey()) {
keyCount++;
} else if (pulsarColumnMetadata.isValue()) {
valueCount++;
}
}
Assert.assertEquals(keyCount, 3);
Assert.assertEquals(valueCount, 1);
List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo =
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schema3.getSchemaInfo());
AvroSchemaHandler avroSchemaHandler =
new AvroSchemaHandler(pulsarSqlSchemaInfoProvider, kvSchemaInfo.getKey(), columnHandleList);
PulsarPrimitiveSchemaHandler pulsarPrimitiveSchemaHandler =
new PulsarPrimitiveSchemaHandler(kvSchemaInfo.getValue());
KeyValueSchemaHandler keyValueSchemaHandler =
new KeyValueSchemaHandler(avroSchemaHandler, pulsarPrimitiveSchemaHandler, columnHandleList);
RawMessage message = mock(RawMessage.class);
Mockito.when(message.getKeyBytes()).thenReturn(
Optional.of(Unpooled.wrappedBuffer(
((KeyValueSchema) schema3).getKeySchema().encode(boo)
))
);
Mockito.when(message.getData()).thenReturn(
Unpooled.wrappedBuffer(schema3.encode(new KeyValue<>(boo, valueData)))
);
KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema3);
Integer a = 1;
Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
Assert.assertEquals(keyValueSchemaHandler.extractField(0, object).toString(),
boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
boo.getValue(columnHandleList.get(1).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
Assert.assertEquals(keyValueSchemaHandler.extractField(2, object),
boo.getValue(columnHandleList.get(2).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
Assert.assertEquals(keyValueSchemaHandler.extractField(3, object), valueData);
}
@Test
public void testSchema4() throws IOException {
List<ColumnMetadata> columnMetadataList =
PulsarMetadata.getPulsarColumns(topicName, schema4.getSchemaInfo(),
true, null);
int keyCount = 0;
int valueCount = 0;
for (ColumnMetadata columnMetadata : columnMetadataList) {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
if (pulsarColumnMetadata.isKey()) {
keyCount++;
} else if (pulsarColumnMetadata.isValue()) {
valueCount++;
}
}
Assert.assertEquals(keyCount, 3);
Assert.assertEquals(valueCount, 2);
List<PulsarColumnHandle> columnHandleList = getColumnHandlerList(columnMetadataList);
PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo =
KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schema4.getSchemaInfo());
AvroSchemaHandler avroSchemaHandler =
new AvroSchemaHandler(pulsarSqlSchemaInfoProvider, kvSchemaInfo.getValue(), columnHandleList);
JSONSchemaHandler jsonSchemaHandler = new JSONSchemaHandler(columnHandleList);
KeyValueSchemaHandler keyValueSchemaHandler =
new KeyValueSchemaHandler(jsonSchemaHandler, avroSchemaHandler, columnHandleList);
RawMessage message = mock(RawMessage.class);
Mockito.when(message.getKeyBytes()).thenReturn(
Optional.of(Unpooled.wrappedBuffer(
((KeyValueSchema) schema4).getKeySchema().encode(boo)
))
);
Mockito.when(message.getData()).thenReturn(
Unpooled.wrappedBuffer(schema4.encode(new KeyValue<>(boo, foo)))
);
KeyValue<ByteBuf, ByteBuf> byteBufKeyValue = getKeyValueByteBuf(message, schema4);
Object object = keyValueSchemaHandler.deserialize(byteBufKeyValue.getKey(), byteBufKeyValue.getValue(), null);
Assert.assertEquals(keyValueSchemaHandler.extractField(0, object).toString(),
boo.getValue(columnHandleList.get(0).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
Assert.assertEquals(keyValueSchemaHandler.extractField(1, object),
boo.getValue(columnHandleList.get(1).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
Assert.assertEquals(keyValueSchemaHandler.extractField(2, object),
boo.getValue(columnHandleList.get(2).getName().substring(KEY_FIELD_NAME_PREFIX_LENGTH)));
Assert.assertEquals(keyValueSchemaHandler.extractField(3, object).toString(),
foo.getValue(columnHandleList.get(3).getName()));
Assert.assertEquals(keyValueSchemaHandler.extractField(4, object).toString(),
foo.getValue(columnHandleList.get(4).getName()) + "");
}
private List<PulsarColumnHandle> getColumnHandlerList(List<ColumnMetadata> columnMetadataList) {
List<PulsarColumnHandle> columnHandleList = new LinkedList<>();
columnMetadataList.forEach(columnMetadata -> {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
"connectorId",
pulsarColumnMetadata.getNameWithCase(),
pulsarColumnMetadata.getType(),
pulsarColumnMetadata.isHidden(),
pulsarColumnMetadata.isInternal(),
pulsarColumnMetadata.getFieldNames(),
pulsarColumnMetadata.getPositionIndices(),
pulsarColumnMetadata.getHandleKeyValueType());
columnHandleList.add(pulsarColumnHandle);
});
return columnHandleList;
}
public KeyValue<ByteBuf, ByteBuf> getKeyValueByteBuf(RawMessage message, Schema schema) {
KeyValueEncodingType encodingType = KeyValueSchemaInfo.decodeKeyValueEncodingType(schema.getSchemaInfo());
ByteBuf keyByteBuf = null;
if (Objects.equals(KeyValueEncodingType.SEPARATED, encodingType)) {
if (message.getKeyBytes().isPresent()) {
keyByteBuf = message.getKeyBytes().get();
} else {
keyByteBuf = null;
}
} else {
keyByteBuf = null;
}
return new KeyValue<>(keyByteBuf, Unpooled.wrappedBuffer(message.getData()));
}
@Data
static class Foo {
private String field1;
private Integer field2;
public Object getValue(String fieldName) {
switch (fieldName) {
case "field1":
return field1;
case "field2":
return field2 == null ? null : new Long(field2);
default:
return null;
}
}
}
@Data
static class Boo {
private String field1;
private Boolean field2;
private Double field3;
public Object getValue(String fieldName) {
switch (fieldName) {
case "field1":
return field1;
case "field2":
return field2;
case "field3":
return field3 == null ? null : field3.doubleValue();
default:
return null;
}
}
}
}