blob: 16d17c2cb272701cee25bad4bcde3d37adf0ad65 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.queries;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.apache.avro.Schema.*;
/**
* Test if ComplexType (RECORD, ARRAY, MAP, UNION, ENUM, and FIXED) field from an AVRO file can be ingested into a JSON
* column in a Pinot segment.
*/
public class JsonIngestionFromAvroQueriesTest extends BaseQueriesTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "JsonIngestionFromAvroTest");
private static final File AVRO_DATA_FILE = new File(INDEX_DIR, "JsonIngestionFromAvroTest.avro");
private static final String RAW_TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";
private static final String INT_COLUMN = "intColumn";
private static final String JSON_COLUMN_1 = "jsonColumn1"; // for testing RECORD, ARRAY, MAP, UNION
private static final String JSON_COLUMN_2 = "jsonColumn2"; // for testing ENUM
private static final String JSON_COLUMN_3 = "jsonColumn3"; // for testing FIXED
private static final String STRING_COLUMN = "stringColumn";
private static final org.apache.pinot.spi.data.Schema SCHEMA =
new org.apache.pinot.spi.data.Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
.addSingleValueDimension(JSON_COLUMN_1, FieldSpec.DataType.JSON)
.addSingleValueDimension(JSON_COLUMN_2, FieldSpec.DataType.JSON)
.addSingleValueDimension(JSON_COLUMN_3, FieldSpec.DataType.JSON)
.addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING).build();
private static final TableConfig TABLE_CONFIG =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
return "";
}
@Override
protected IndexSegment getIndexSegment() {
return _indexSegment;
}
@Override
protected List<IndexSegment> getIndexSegments() {
return _indexSegments;
}
/** @return {@link GenericRow} representing a row in Pinot table. */
private static GenericRow createTableRecord(int intValue, String stringValue, Object jsonValue,
GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue) {
GenericRow record = new GenericRow();
record.putValue(INT_COLUMN, intValue);
record.putValue(STRING_COLUMN, stringValue);
record.putValue(JSON_COLUMN_1, jsonValue);
record.putValue(JSON_COLUMN_2, enumValue);
record.putValue(JSON_COLUMN_3, fixedValue);
return record;
}
private static Map<String, String> createMapField(Pair<String, String>[] pairs) {
Map<String, String> map = new HashMap<>();
for (Pair<String, String> pair : pairs) {
map.put(pair.getLeft(), pair.getRight());
}
return map;
}
private static Schema createRecordSchema() {
List<Field> fields = new ArrayList<>();
fields.add(new Field("id", create(Type.INT)));
fields.add(new Field("name", create(Type.STRING)));
return createRecord("record", "doc", JsonIngestionFromAvroQueriesTest.class.getCanonicalName(), false, fields);
}
private static GenericData.Record createRecordField(String k1, int v1, String k2, String v2) {
GenericData.Record record = new GenericData.Record(createRecordSchema());
record.put(k1, v1);
record.put(k2, v2);
return record;
}
private static GenericData.EnumSymbol createEnumField(Schema enumSchema, String enumValue) {
return new GenericData.EnumSymbol(enumSchema, enumValue);
}
private static GenericData.Fixed createFixedField(Schema fixedSchema, int value) {
byte[] bytes = {(byte) (value >> 24), (byte) (value >> 16), (byte) (value >> 8), (byte) value};
return new GenericData.Fixed(fixedSchema, bytes);
}
private static void createInputFile()
throws IOException {
INDEX_DIR.mkdir();
Schema avroSchema = createRecord("eventsRecord", null, null, false);
Schema enumSchema = createEnum("direction", null, null, Arrays.asList("UP", "DOWN", "LEFT", "RIGHT"));
Schema fixedSchema = createFixed("fixed", null, null, 4);
List<Field> fields = Arrays.asList(
new Field(INT_COLUMN, createUnion(Lists.newArrayList(create(Type.INT), create(Type.NULL))), null, null),
new Field(STRING_COLUMN, createUnion(Lists.newArrayList(create(Type.STRING), create(Type.NULL))), null, null),
new Field(JSON_COLUMN_1,
createUnion(createArray(create(Type.STRING)), createMap(create(Type.STRING)), createRecordSchema(),
create(Type.STRING), create(Type.NULL))), new Field(JSON_COLUMN_2, enumSchema),
new Field(JSON_COLUMN_3, fixedSchema));
avroSchema.setFields(fields);
List<GenericRow> inputRecords = new ArrayList<>();
// Insert ARRAY
inputRecords.add(
createTableRecord(1, "daffy duck", Arrays.asList("this", "is", "a", "test"), createEnumField(enumSchema, "UP"),
createFixedField(fixedSchema, 1)));
// Insert MAP
inputRecords.add(
createTableRecord(2, "mickey mouse", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 2)));
inputRecords.add(
createTableRecord(3, "donald duck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 3)));
inputRecords.add(
createTableRecord(4, "scrooge mcduck", createMapField(new Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 4)));
// insert RECORD
inputRecords.add(createTableRecord(5, "minney mouse", createRecordField("id", 1, "name", "minney"),
createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 5)));
// Insert simple Java String (gets converted into JSON value)
inputRecords.add(
createTableRecord(6, "pluto", "test", createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 6)));
// Insert JSON string (gets converted into JSON document)
inputRecords.add(
createTableRecord(7, "scooby doo", "{\"name\":\"scooby\",\"id\":7}", createEnumField(enumSchema, "UP"),
createFixedField(fixedSchema, 7)));
try (DataFileWriter<GenericData.Record> fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
fileWriter.create(avroSchema, AVRO_DATA_FILE);
for (GenericRow inputRecord : inputRecords) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(INT_COLUMN, inputRecord.getValue(INT_COLUMN));
record.put(STRING_COLUMN, inputRecord.getValue(STRING_COLUMN));
record.put(JSON_COLUMN_1, inputRecord.getValue(JSON_COLUMN_1));
record.put(JSON_COLUMN_2, inputRecord.getValue(JSON_COLUMN_2));
record.put(JSON_COLUMN_3, inputRecord.getValue(JSON_COLUMN_3));
fileWriter.append(record);
}
}
}
private static RecordReader createRecordReader()
throws IOException {
Set<String> set = new HashSet<>();
set.add(INT_COLUMN);
set.add(STRING_COLUMN);
set.add(JSON_COLUMN_1);
set.add(JSON_COLUMN_2);
set.add(JSON_COLUMN_3);
AvroRecordReader avroRecordReader = new AvroRecordReader();
avroRecordReader.init(AVRO_DATA_FILE, set, null);
return avroRecordReader;
}
/** Create an AVRO file and then ingest it into Pinot while creating a JsonIndex. */
@BeforeClass
public void setUp()
throws Exception {
FileUtils.deleteDirectory(INDEX_DIR);
createInputFile();
List<String> jsonIndexColumns = Arrays.asList(JSON_COLUMN_1, JSON_COLUMN_2, JSON_COLUMN_3);
TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
segmentGeneratorConfig.setInputFilePath(AVRO_DATA_FILE.getPath());
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(segmentGeneratorConfig, createRecordReader());
driver.build();
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
indexLoadingConfig.setTableConfig(TABLE_CONFIG);
indexLoadingConfig.setJsonIndexColumns(new HashSet<String>(jsonIndexColumns));
indexLoadingConfig.setReadMode(ReadMode.mmap);
ImmutableSegment immutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
/** Verify that we can query the JSON column that ingested ComplexType data from an AVRO file (see setUp). */
@Test
public void testSimpleSelectOnJsonColumn() {
Operator<SelectionResultsBlock> operator =
getOperator("select intColumn, stringColumn, jsonColumn1, jsonColumn2 FROM " + "testTable limit 100");
SelectionResultsBlock block = operator.nextBlock();
Collection<Object[]> rows = block.getRows();
Assert.assertEquals(block.getDataSchema().getColumnDataType(0), DataSchema.ColumnDataType.INT);
Assert.assertEquals(block.getDataSchema().getColumnDataType(1), DataSchema.ColumnDataType.STRING);
Assert.assertEquals(block.getDataSchema().getColumnDataType(2), DataSchema.ColumnDataType.JSON);
List<String> expecteds = Arrays.asList("[1, daffy duck, [\"this\",\"is\",\"a\",\"test\"], \"UP\"]",
"[2, mickey mouse, {\"a\":\"1\",\"b\":\"2\"}, \"DOWN\"]", "[3, donald duck, {\"a\":\"1\",\"b\":\"2\"}, \"UP\"]",
"[4, scrooge mcduck, {\"a\":\"1\",\"b\":\"2\"}, \"LEFT\"]",
"[5, minney mouse, {\"name\":\"minney\",\"id\":1}, \"RIGHT\"]", "[6, pluto, \"test\", \"DOWN\"]",
"[7, scooby doo, {\"name\":\"scooby\",\"id\":7}, \"UP\"]");
int index = 0;
Iterator<Object[]> iterator = rows.iterator();
while (iterator.hasNext()) {
Object[] row = iterator.next();
Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
}
}
/** Verify simple path expression query on ingested Avro file. */
@Test
public void testJsonPathSelectOnJsonColumn() {
Operator<SelectionResultsBlock> operator = getOperator(
"select intColumn, json_extract_scalar(jsonColumn1, '$.name', " + "'STRING', 'null') FROM testTable");
SelectionResultsBlock block = operator.nextBlock();
Collection<Object[]> rows = block.getRows();
Assert.assertEquals(block.getDataSchema().getColumnDataType(0), DataSchema.ColumnDataType.INT);
Assert.assertEquals(block.getDataSchema().getColumnDataType(1), DataSchema.ColumnDataType.STRING);
List<String> expecteds =
Arrays.asList("[1, null]", "[2, null]", "[3, null]", "[4, null]", "[5, minney]", "[6, null]", "[7, scooby]");
int index = 0;
Iterator<Object[]> iterator = rows.iterator();
while (iterator.hasNext()) {
Object[] row = iterator.next();
Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
}
}
/** Verify simple path expression query on ingested Avro file. */
@Test
public void testStringValueSelectOnJsonColumn() {
Operator<SelectionResultsBlock> operator = getOperator(
"SELECT json_extract_scalar(jsonColumn1, '$', 'STRING') FROM "
+ "testTable WHERE JSON_MATCH(jsonColumn1, '\"$\" = ''test''')");
SelectionResultsBlock block = operator.nextBlock();
Collection<Object[]> rows = block.getRows();
Assert.assertEquals(block.getDataSchema().getColumnDataType(0), DataSchema.ColumnDataType.STRING);
List<String> expecteds = Arrays.asList("[test]");
int index = 0;
Iterator<Object[]> iterator = rows.iterator();
while (iterator.hasNext()) {
Object[] row = iterator.next();
Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
}
}
/** Verify that ingestion from avro FIXED type field (jsonColumn3) to Pinot JSON column worked fine. */
@Test
public void testSimpleSelectOnFixedJsonColumn() {
Operator<SelectionResultsBlock> operator = getOperator("select jsonColumn3 FROM testTable");
SelectionResultsBlock block = operator.nextBlock();
Collection<Object[]> rows = block.getRows();
Assert.assertEquals(block.getDataSchema().getColumnDataType(0), DataSchema.ColumnDataType.JSON);
List<String> expecteds =
Arrays.asList("[[0,0,0,1]]", "[[0,0,0,2]]", "[[0,0,0,3]]", "[[0,0,0,4]]", "[[0,0,0,5]]", "[[0,0,0,6]]",
"[[0,0,0,7]]");
int index = 0;
Iterator<Object[]> iterator = rows.iterator();
while (iterator.hasNext()) {
Object[] row = iterator.next();
Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
}
}
@AfterClass
public void tearDown()
throws IOException {
_indexSegment.destroy();
FileUtils.deleteDirectory(INDEX_DIR);
}
}