blob: 8f03dd53e77bffc0ce6881185628b9c125a98cbd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.json;
import static org.apache.drill.test.rowSet.RowSetUtilities.longArray;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.InputStream;
import org.apache.drill.categories.RowSetTest;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.physical.impl.scan.BaseScanOperatorExecTest.BaseScanFixtureBuilder;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(RowSetTest.class)
public class TestJsonScanOp extends SubOperatorTest {
private static class JsonReaderFixure implements ManagedReader<SchemaNegotiator> {
private final String filePath;
private InputStream stream;
private JsonLoader jsonLoader;
private final JsonLoaderOptions options;
public JsonReaderFixure(String filePath, JsonLoaderOptions options) {
this.filePath = filePath;
this.options = options;
}
@Override
public boolean open(SchemaNegotiator negotiator) {
stream = new BufferedInputStream(getClass().getResourceAsStream(filePath));
jsonLoader = new JsonLoaderBuilder()
.resultSetLoader(negotiator.build())
.options(options)
.fromStream(stream)
.build();
return true;
}
@Override
public boolean next() {
return jsonLoader.readBatch();
}
@Override
public void close() {
if (jsonLoader != null) {
jsonLoader.close();
jsonLoader = null;
}
}
}
/**
* Test the case where the reader does not play the "first batch contains
* only schema" game, and instead returns data. The Scan operator will
* split the first batch into two: one with schema only, another with
* data.
*/
@Test
public void testScanOperator() {
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
JsonLoaderOptions options = new JsonLoaderOptions();
options.allTextMode = true;
builder.addReader(new JsonReaderFixure("/store/json/schema_change_int_to_string.json", options));
builder.setProjection("field_3", "field_5");
ScanFixture scanFixture = builder.build();
ScanOperatorExec scanOp = scanFixture.scanOp;
assertTrue(scanOp.buildSchema());
RowSet result = fixture.wrap(scanOp.batchAccessor().container());
result.clear();
assertTrue(scanOp.next());
result = fixture.wrap(scanOp.batchAccessor().container());
TupleMetadata expectedSchema = new SchemaBuilder()
.addMap("field_3")
.addNullable("inner_1", MinorType.VARCHAR)
.addNullable("inner_2", MinorType.VARCHAR)
.addMapArray("inner_3")
.addNullable("inner_object_field_1", MinorType.VARCHAR)
.resumeMap()
.resumeSchema()
.addMapArray("field_5")
.addArray("inner_list", MinorType.VARCHAR)
.addArray("inner_list_2", MinorType.VARCHAR)
.resumeSchema()
.buildSchema();
RowSetUtilities.strArray();
RowSet expected = fixture.rowSetBuilder(expectedSchema)
.addRow(mapValue(null, null, mapArray()),
mapArray())
.addRow(mapValue("2", null, mapArray()),
mapArray(
mapValue(strArray("1", "", "6"), strArray()),
mapValue(strArray("3", "8"), strArray()),
mapValue(strArray("12", "", "4", "null", "5"), strArray())))
.addRow(mapValue("5", "3", mapArray(singleMap(null), singleMap("10"))),
mapArray(
mapValue(strArray("5", "", "6.0", "1234"), strArray()),
mapValue(strArray("7", "8.0", "12341324"),
strArray("1", "2", "2323.443e10", "hello there")),
mapValue(strArray("3", "4", "5"), strArray("10", "11", "12"))))
.build();
RowSetUtilities.verify(expected, result);
scanFixture.close();
}
@Test
public void testScanProjectMapSubset() {
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
JsonLoaderOptions options = new JsonLoaderOptions();
builder.addReader(new JsonReaderFixure("/store/json/schema_change_int_to_string.json", options));
builder.setProjection("field_3.inner_1", "field_3.inner_2");
ScanFixture scanFixture = builder.build();
ScanOperatorExec scanOp = scanFixture.scanOp;
assertTrue(scanOp.buildSchema());
RowSet result = fixture.wrap(scanOp.batchAccessor().container());
assertTrue(scanOp.next());
result = fixture.wrap(scanOp.batchAccessor().container());
TupleMetadata schema = new SchemaBuilder()
.addMap("field_3")
.addNullable("inner_1", MinorType.BIGINT)
.addNullable("inner_2", MinorType.BIGINT)
.resumeSchema()
.build();
RowSet expected = fixture.rowSetBuilder(schema)
.addSingleCol(mapValue(null, null))
.addSingleCol(mapValue(2L, null))
.addSingleCol(mapValue(5L, 3L))
.build();
RowSetUtilities.verify(expected, result);
scanFixture.close();
}
@Test
public void testScanProjectMapArraySubsetAndNull() {
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
JsonLoaderOptions options = new JsonLoaderOptions();
options.allTextMode = true;
builder.addReader(new JsonReaderFixure("/store/json/schema_change_int_to_string.json", options));
builder.setProjection("field_5.inner_list", "field_5.dummy");
builder.builder().nullType(Types.optional(MinorType.VARCHAR));
ScanFixture scanFixture = builder.build();
ScanOperatorExec scanOp = scanFixture.scanOp;
assertTrue(scanOp.buildSchema());
RowSet result = fixture.wrap(scanOp.batchAccessor().container());
assertTrue(scanOp.next());
result = fixture.wrap(scanOp.batchAccessor().container());
TupleMetadata schema = new SchemaBuilder()
.addMapArray("field_5")
.addArray("inner_list", MinorType.VARCHAR)
.addNullable("dummy", MinorType.VARCHAR)
.resumeSchema()
.build();
RowSet expected = fixture.rowSetBuilder(schema)
.addSingleCol(mapArray())
.addSingleCol(mapArray(
mapValue(strArray("1", "", "6"), null),
mapValue(strArray("3", "8"), null),
mapValue(strArray("12", "", "4", "null", "5"), null)))
.addSingleCol(mapArray(
mapValue(strArray("5", "", "6.0", "1234"), null),
mapValue(strArray("7", "8.0", "12341324"), null),
mapValue(strArray("3", "4", "5"), null)))
.build();
RowSetUtilities.verify(expected, result);
scanFixture.close();
}
@Test
public void testScanProject() {
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
JsonLoaderOptions options = new JsonLoaderOptions();
builder.addReader(new JsonReaderFixure("/store/json/schema_change_int_to_string.json", options));
// Projection omits field_2 which has an ambiguous type. Since
// the field is not materialized, the ambiguity is benign.
// (If this test triggers an error, perhaps a change has caused
// the column to become materialized.)
builder.setProjection("field_1", "field_3.inner_1", "field_3.inner_2", "field_4.inner_1",
"non_existent_at_root", "non_existent.nested.field");
builder.builder().nullType(Types.optional(MinorType.VARCHAR));
ScanFixture scanFixture = builder.build();
ScanOperatorExec scanOp = scanFixture.scanOp;
assertTrue(scanOp.buildSchema());
RowSet result = fixture.wrap(scanOp.batchAccessor().container());
assertTrue(scanOp.next());
result = fixture.wrap(scanOp.batchAccessor().container());
// Projects all columns (since the revised scan operator handles missing-column
// projection.) Note that the result includes two batches, including the first empty
// batch.
TupleMetadata schema = new SchemaBuilder()
.addArray("field_1", MinorType.BIGINT)
.addMap("field_3")
.addNullable("inner_1", MinorType.BIGINT)
.addNullable("inner_2", MinorType.BIGINT)
.resumeSchema()
.addMap("field_4")
.addArray("inner_1", MinorType.BIGINT)
.resumeSchema()
.addNullable("non_existent_at_root", MinorType.VARCHAR)
.addMap("non_existent")
.addMap("nested")
.addNullable("field", MinorType.VARCHAR)
.resumeMap()
.resumeSchema()
.build();
Object nullMap = singleMap(singleMap(null));
RowSet expected = fixture.rowSetBuilder(schema)
.addRow(longArray(1L), mapValue(null, null), singleMap(longArray()), null, nullMap )
.addRow(longArray(5L), mapValue(2L, null), singleMap(longArray(1L, 2L, 3L)), null, nullMap)
.addRow(longArray(5L, 10L, 15L), mapValue(5L, 3L), singleMap(longArray(4L, 5L, 6L)), null, nullMap)
.build();
RowSetUtilities.verify(expected, result);
scanFixture.close();
}
}