blob: 93c8ea5a54d5e55f15352a50ee410d53669c13b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.iceberg;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.physical.rowSet.DirectRowSet;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.iceberg.format.IcebergFormatPluginConfig;
import org.apache.drill.exec.util.JsonStringHashMap;
import org.apache.drill.exec.util.Text;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types;
import org.hamcrest.MatcherAssert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
import static org.apache.drill.test.TestBuilder.listOf;
import static org.apache.drill.test.TestBuilder.mapOf;
import static org.apache.drill.test.TestBuilder.mapOfObject;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class IcebergQueriesTest extends ClusterTest {
private static Table table;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
startCluster(ClusterFixture.builder(dirTestWatcher));
StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
formats.put("iceberg", IcebergFormatPluginConfig.builder().build());
FileSystemConfig newPluginConfig = new FileSystemConfig(
pluginConfig.getConnection(),
pluginConfig.getConfig(),
pluginConfig.getWorkspaces(),
formats,
PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
newPluginConfig.setEnabled(pluginConfig.isEnabled());
pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
// defining another plugin with iceberg format to ensure that DRILL-8049 is fixed
FileSystemConfig anotherFileSystemConfig = pluginConfig.copyWithFormats(formats);
pluginRegistry.put("dfs2", anotherFileSystemConfig);
Configuration config = new Configuration();
config.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
HadoopTables tables = new HadoopTables(config);
Schema structSchema = new Schema(
Types.NestedField.optional(13, "struct_int_field", Types.IntegerType.get()),
Types.NestedField.optional(14, "struct_string_field", Types.StringType.get())
);
Types.ListType repeatedStructType = Types.ListType.ofOptional(
16, Types.StructType.of(
Types.NestedField.optional(17, "struct_int_field", Types.IntegerType.get()),
Types.NestedField.optional(18, "struct_string_field", Types.StringType.get())
));
Schema schema = new Schema(
Types.NestedField.optional(1, "int_field", Types.IntegerType.get()),
Types.NestedField.optional(2, "long_field", Types.LongType.get()),
Types.NestedField.optional(3, "float_field", Types.FloatType.get()),
Types.NestedField.optional(4, "double_field", Types.DoubleType.get()),
Types.NestedField.optional(5, "string_field", Types.StringType.get()),
Types.NestedField.optional(6, "boolean_field", Types.BooleanType.get()),
Types.NestedField.optional(26, "time_field", Types.TimeType.get()),
Types.NestedField.optional(27, "timestamp_field", Types.TimestampType.withoutZone()),
Types.NestedField.optional(28, "date_field", Types.DateType.get()),
Types.NestedField.optional(29, "decimal_field", Types.DecimalType.of(4, 2)),
Types.NestedField.optional(30, "uuid_field", Types.UUIDType.get()),
Types.NestedField.optional(31, "fixed_field", Types.FixedType.ofLength(10)),
Types.NestedField.optional(32, "binary_field", Types.BinaryType.get()),
Types.NestedField.optional(7, "list_field", Types.ListType.ofOptional(
10, Types.StringType.get())),
Types.NestedField.optional(8, "map_field", Types.MapType.ofOptional(
11, 12, Types.StringType.get(), Types.FloatType.get())),
Types.NestedField.required(9, "struct_field", structSchema.asStruct()),
Types.NestedField.required(15, "repeated_struct_field", repeatedStructType),
Types.NestedField.required(19, "repeated_list_field", Types.ListType.ofOptional(
20, Types.ListType.ofOptional(21, Types.StringType.get()))),
Types.NestedField.optional(22, "repeated_map_field", Types.ListType.ofOptional(
23, Types.MapType.ofOptional(24, 25, Types.StringType.get(), Types.FloatType.get())))
);
List<String> listValue = Arrays.asList("a", "b", "c");
Map<String, Float> mapValue = new HashMap<>();
mapValue.put("a", 0.1F);
mapValue.put("b", 0.2F);
Map<String, Float> secondMapValue = new HashMap<>();
secondMapValue.put("true", 1F);
secondMapValue.put("false", 0F);
Record structValue = GenericRecord.create(structSchema);
structValue.setField("struct_int_field", 123);
structValue.setField("struct_string_field", "abc");
Record secondStructValue = GenericRecord.create(structSchema);
secondStructValue.setField("struct_int_field", 321);
secondStructValue.setField("struct_string_field", "def");
Record record = GenericRecord.create(schema);
record.setField("int_field", 1);
record.setField("long_field", 100L);
record.setField("float_field", 0.5F);
record.setField("double_field", 1.5D);
record.setField("string_field", "abc");
record.setField("boolean_field", true);
record.setField("time_field", LocalTime.of(2, 42, 42));
record.setField("timestamp_field", LocalDateTime.of(1994, 4, 18, 11, 0, 0));
record.setField("date_field", LocalDate.of(1994, 4, 18));
record.setField("decimal_field", new BigDecimal("12.34"));
record.setField("uuid_field", new byte[16]);
record.setField("fixed_field", new byte[10]);
record.setField("binary_field", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)));
record.setField("list_field", listValue);
record.setField("map_field", mapValue);
record.setField("struct_field", structValue);
record.setField("repeated_struct_field", Arrays.asList(structValue, structValue));
record.setField("repeated_list_field", Arrays.asList(listValue, listValue));
record.setField("repeated_map_field", Arrays.asList(mapValue, mapValue));
Record nullsRecord = GenericRecord.create(schema);
nullsRecord.setField("int_field", null);
nullsRecord.setField("long_field", null);
nullsRecord.setField("float_field", null);
nullsRecord.setField("double_field", null);
nullsRecord.setField("string_field", null);
nullsRecord.setField("boolean_field", null);
nullsRecord.setField("time_field", null);
nullsRecord.setField("timestamp_field", null);
nullsRecord.setField("date_field", null);
nullsRecord.setField("decimal_field", null);
nullsRecord.setField("uuid_field", null);
nullsRecord.setField("fixed_field", null);
nullsRecord.setField("binary_field", null);
nullsRecord.setField("list_field", null);
nullsRecord.setField("map_field", null);
nullsRecord.setField("struct_field", GenericRecord.create(structSchema));
nullsRecord.setField("repeated_struct_field", Collections.emptyList());
nullsRecord.setField("repeated_list_field", Collections.emptyList());
nullsRecord.setField("repeated_map_field", Collections.emptyList());
Record secondRecord = GenericRecord.create(schema);
secondRecord.setField("int_field", 988);
secondRecord.setField("long_field", 543L);
secondRecord.setField("float_field", Float.NaN);
secondRecord.setField("double_field", Double.MAX_VALUE);
secondRecord.setField("string_field", "def");
secondRecord.setField("boolean_field", false);
secondRecord.setField("time_field", LocalTime.of(3, 41, 53));
secondRecord.setField("timestamp_field", LocalDateTime.of(1995, 9, 10, 9, 0, 0));
secondRecord.setField("date_field", LocalDate.of(1995, 9, 10));
secondRecord.setField("decimal_field", new BigDecimal("99.99"));
secondRecord.setField("uuid_field", new byte[16]);
secondRecord.setField("fixed_field", new byte[10]);
secondRecord.setField("binary_field", ByteBuffer.wrap("world".getBytes(StandardCharsets.UTF_8)));
secondRecord.setField("list_field", Arrays.asList("y", "n"));
secondRecord.setField("map_field", secondMapValue);
secondRecord.setField("struct_field", secondStructValue);
secondRecord.setField("repeated_struct_field", Arrays.asList(structValue, secondStructValue));
secondRecord.setField("repeated_list_field", Arrays.asList(listValue, Arrays.asList("y", "n")));
secondRecord.setField("repeated_map_field", Arrays.asList(mapValue, secondMapValue));
String location = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypes").toUri().getPath();
table = tables.create(schema, location);
writeParquetAndCommitDataFile(table, "allTypes", Arrays.asList(record, nullsRecord));
writeParquetAndCommitDataFile(table, "allTypes_1", Collections.singleton(secondRecord));
String avroLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesAvro").toUri().getPath();
writeAndCommitDataFile(tables.create(structSchema, avroLocation), "allTypes", FileFormat.AVRO,
Arrays.asList(structValue, GenericRecord.create(structSchema), secondStructValue));
String orcLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesOrc").toUri().getPath();
writeAndCommitDataFile(tables.create(structSchema, orcLocation), "allTypes", FileFormat.ORC,
Arrays.asList(structValue, GenericRecord.create(structSchema), secondStructValue));
String emptyTableLocation = Paths.get(dirTestWatcher.getDfsTestTmpDir().toURI().getPath(), "testAllTypesEmpty").toUri().getPath();
tables.create(structSchema, emptyTableLocation);
}
private static void writeParquetAndCommitDataFile(Table table, String name, Iterable<Record> records) throws IOException {
writeAndCommitDataFile(table, name, FileFormat.PARQUET, records);
}
private static void writeAndCommitDataFile(Table table, String name, FileFormat fileFormat, Iterable<Record> records) throws IOException {
OutputFile outputFile = table.io().newOutputFile(
new Path(table.location(), fileFormat.addExtension(name)).toUri().getPath());
FileAppender<Record> fileAppender = new GenericAppenderFactory(table.schema())
.newAppender(outputFile, fileFormat);
fileAppender.addAll(records);
fileAppender.close();
DataFile dataFile = DataFiles.builder(table.spec())
.withInputFile(outputFile.toInputFile())
.withMetrics(fileAppender.metrics())
.build();
Transaction transaction = table.newTransaction();
transaction.newAppend()
.appendFile(dataFile)
.commit();
transaction.commitTransaction();
}
@Test
public void testSerDe() throws Exception {
String plan = queryBuilder().sql("select * from dfs.tmp.testAllTypes").explainJson();
long count = queryBuilder().physical(plan).run().recordCount();
assertEquals(3, count);
}
@Test
public void testSelectWithSnapshotId() throws Exception {
String snapshotQuery = "select snapshot_id from dfs.tmp.`testAllTypes#snapshots` order by committed_at limit 1";
String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotId => %s))";
long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
String plan = queryBuilder().sql(query, snapshotId).explainJson();
long count = queryBuilder().physical(plan).run().recordCount();
assertEquals(2, count);
}
@Test
public void testSelectWithSnapshotAsOfTime() throws Exception {
String snapshotQuery = "select committed_at from dfs.tmp.`testAllTypes#snapshots` order by committed_at limit 1";
String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotAsOfTime => %s))";
long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
String plan = queryBuilder().sql(query, snapshotId).explainJson();
long count = queryBuilder().physical(plan).run().recordCount();
assertEquals(2, count);
}
@Test
public void testSelectFromSnapshotId() throws Exception {
String snapshotQuery = "select snapshot_id from dfs.tmp.`testAllTypes#snapshots` order by committed_at limit 1";
String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', fromSnapshotId => %s))";
long snapshotId = queryBuilder().sql(snapshotQuery).singletonLong();
String plan = queryBuilder().sql(query, snapshotId).explainJson();
long count = queryBuilder().physical(plan).run().recordCount();
assertEquals(1, count);
}
@Test
public void testSelectFromSnapshotIdAndToSnapshotId() throws Exception {
String snapshotQuery = "select snapshot_id from dfs.tmp.`testAllTypes#snapshots` order by committed_at";
String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', fromSnapshotId => %s, toSnapshotId => %s))";
DirectRowSet rowSet = queryBuilder().sql(snapshotQuery).rowSet();
try {
RowSetReader reader = rowSet.reader();
assertTrue(reader.next());
Long fromSnapshotId = (Long) reader.column(0).reader().getObject();
assertTrue(reader.next());
Long toSnapshotId = (Long) reader.column(0).reader().getObject();
String plan = queryBuilder().sql(query, fromSnapshotId, toSnapshotId).explainJson();
long count = queryBuilder().physical(plan).run().recordCount();
assertEquals(1, count);
} finally {
rowSet.clear();
}
}
@Test
public void testSelectWithSnapshotIdAndSnapshotAsOfTime() throws Exception {
String query = "select * from table(dfs.tmp.testAllTypes(type => 'iceberg', snapshotId => %s, snapshotAsOfTime => %s))";
try {
queryBuilder().sql(query, 123, 456).run();
fail();
} catch (UserRemoteException e) {
MatcherAssert.assertThat(e.getVerboseMessage(), containsString("Both 'snapshotId' and 'snapshotAsOfTime' cannot be specified"));
}
}
@Test
public void testAllTypes() throws Exception {
testBuilder()
.sqlQuery("select * from dfs.tmp.testAllTypes")
.unOrdered()
.baselineColumns("int_field", "long_field", "float_field", "double_field", "string_field",
"boolean_field", "time_field", "timestamp_field", "date_field", "decimal_field", "uuid_field",
"fixed_field", "binary_field", "list_field", "map_field", "struct_field", "repeated_struct_field",
"repeated_list_field", "repeated_map_field")
.baselineValues(1, 100L, 0.5F, 1.5D, "abc", true, LocalTime.of(2, 42, 42),
LocalDateTime.of(1994, 4, 18, 11, 0, 0), LocalDate.of(1994, 4, 18),
new BigDecimal("12.34"), new byte[16], new byte[10], "hello".getBytes(StandardCharsets.UTF_8),
listOf("a", "b", "c"),
mapOfObject(
new Text("a"), 0.1F,
new Text("b"), 0.2F),
mapOf(
"struct_int_field", 123,
"struct_string_field", "abc"),
listOf(
mapOf(
"struct_int_field", 123,
"struct_string_field", "abc"),
mapOf(
"struct_int_field", 123,
"struct_string_field", "abc")),
listOf(listOf("a", "b", "c"), listOf("a", "b", "c")),
listOf(
mapOfObject(
new Text("a"), 0.1F,
new Text("b"), 0.2F),
mapOfObject(
new Text("a"), 0.1F,
new Text("b"), 0.2F))
)
.baselineValues(null, null, null, null, null, null, null, null, null, null, null, null, null,
listOf(), mapOfObject(), mapOf(), listOf(), listOf(), listOf())
.baselineValues(988, 543L, Float.NaN, Double.MAX_VALUE, "def", false, LocalTime.of(3, 41, 53),
LocalDateTime.of(1995, 9, 10, 9, 0, 0), LocalDate.of(1995, 9, 10),
new BigDecimal("99.99"), new byte[16], new byte[10], "world".getBytes(StandardCharsets.UTF_8),
listOf("y", "n"),
mapOfObject(
new Text("true"), 1F,
new Text("false"), 0F),
mapOf(
"struct_int_field", 321,
"struct_string_field", "def"),
listOf(
mapOf(
"struct_int_field", 123,
"struct_string_field", "abc"),
mapOf(
"struct_int_field", 321,
"struct_string_field", "def")),
listOf(listOf("a", "b", "c"), listOf("y", "n")),
listOf(
mapOfObject(
new Text("a"), 0.1F,
new Text("b"), 0.2F),
mapOfObject(
new Text("true"), 1F,
new Text("false"), 0F))
)
.go();
}
@Test
public void testProjectingColumns() throws Exception {
String query = "select int_field, string_field from dfs.tmp.testAllTypes";
queryBuilder()
.sql(query)
.planMatcher()
.include("projection\\=struct<1: int_field: optional int, 5: string_field: optional string>")
.match();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("int_field", "string_field")
.baselineValues(1, "abc")
.baselineValues(null, null)
.baselineValues(988, "def")
.go();
}
@Test
public void testProjectNestedColumn() throws Exception {
String query = "select t.struct_field.struct_int_field as i, list_field[1] as l," +
"t.repeated_struct_field[0].struct_string_field s from dfs.tmp.testAllTypes t";
queryBuilder()
.sql(query)
.planMatcher()
.include("projection\\=struct<" +
"14: list_field: optional list<string>, " +
"16: struct_field: required struct<23: struct_int_field: optional int>, " +
"17: repeated_struct_field: required list<struct<27: struct_string_field: optional string>>")
.match();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("i", "l", "s")
.baselineValues(123, "b", "abc")
.baselineValues(null, null, null)
.baselineValues(321, "n", "abc")
.go();
}
@Test
public void testFilterPushdown() throws Exception {
String query = "select int_field, string_field from dfs.tmp.testAllTypes where long_field = 100";
queryBuilder()
.sql(query)
.planMatcher()
.include("filter\\=ref\\(name\\=\"long_field\"\\) \\=\\= 100")
.include("projection\\=struct<1: int_field: optional int, 2: long_field: optional long, 5: string_field: optional string>")
.match();
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("int_field", "string_field")
.baselineValues(1, "abc")
.go();
}
@Test
public void testEmptyResults() throws Exception {
String query = "select int_field, string_field from dfs.tmp.testAllTypes where long_field = 101";
queryBuilder()
.sql(query)
.planMatcher()
.include("filter\\=ref\\(name\\=\"long_field\"\\) \\=\\= 101")
.include("projection\\=struct<1: int_field: optional int, 2: long_field: optional long, 5: string_field: optional string>")
.match();
testBuilder()
.sqlQuery(query)
.ordered()
.expectsEmptyResultSet()
.go();
}
@Test
public void testFilterWithDifferentTypes() throws Exception {
String query = "select int_field, string_field from dfs.tmp.testAllTypes where long_field = '100'";
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("int_field", "string_field")
.baselineValues(1, "abc")
.go();
}
@Test
public void testSelectEntriesMetadata() throws Exception {
String query = "select * from dfs.tmp.`testAllTypes#entries`";
long count = queryBuilder().sql(query).run().recordCount();
assertEquals(2, count);
}
@Test
public void testSelectFilesMetadata() throws Exception {
String query = "select * from dfs.tmp.`testAllTypes#files`";
long count = queryBuilder().sql(query).run().recordCount();
assertEquals(2, count);
}
@Test
public void testSelectHistoryMetadata() throws Exception {
String query = "select * from dfs.tmp.`testAllTypes#history`";
List<HistoryEntry> entries = table.history();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("made_current_at", "snapshot_id", "parent_id", "is_current_ancestor")
.baselineValues(LocalDateTime.ofInstant(Instant.ofEpochMilli(entries.get(0).timestampMillis()), ZoneId.of("UTC")),
entries.get(0).snapshotId(), null, true)
.baselineValues(LocalDateTime.ofInstant(Instant.ofEpochMilli(entries.get(1).timestampMillis()), ZoneId.of("UTC")),
entries.get(1).snapshotId(), entries.get(0).snapshotId(), true)
.go();
}
@Test
public void testSelectSnapshotsMetadata() throws Exception {
String query = "select * from dfs.tmp.`testAllTypes#snapshots`";
List<Snapshot> snapshots = new ArrayList<>();
table.snapshots().forEach(snapshots::add);
JsonStringHashMap<Object, Object> summaryMap = new JsonStringHashMap<>();
snapshots.get(0).summary().forEach((k, v) ->
summaryMap.put(new Text(k.getBytes(StandardCharsets.UTF_8)), new Text(v.getBytes(StandardCharsets.UTF_8))));
JsonStringHashMap<Object, Object> secondSummaryMap = new JsonStringHashMap<>();
snapshots.get(1).summary().forEach((k, v) ->
secondSummaryMap.put(new Text(k.getBytes(StandardCharsets.UTF_8)), new Text(v.getBytes(StandardCharsets.UTF_8))));
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("committed_at", "snapshot_id", "parent_id", "operation", "manifest_list", "summary")
.baselineValues(LocalDateTime.ofInstant(Instant.ofEpochMilli(snapshots.get(0).timestampMillis()), ZoneId.of("UTC")),
snapshots.get(0).snapshotId(), snapshots.get(0).parentId(), snapshots.get(0).operation(), snapshots.get(0).manifestListLocation(), summaryMap)
.baselineValues(LocalDateTime.ofInstant(Instant.ofEpochMilli(snapshots.get(1).timestampMillis()), ZoneId.of("UTC")),
snapshots.get(1).snapshotId(), snapshots.get(1).parentId(), snapshots.get(1).operation(), snapshots.get(1).manifestListLocation(), secondSummaryMap)
.go();
}
@Test
public void testSelectManifestsMetadata() throws Exception {
String query = "select * from dfs.tmp.`testAllTypes#manifests`";
long count = queryBuilder().sql(query).run().recordCount();
assertEquals(2, count);
}
@Test
public void testSelectPartitionsMetadata() throws Exception {
String query = "select * from dfs.tmp.`testAllTypes#partitions`";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("record_count", "file_count")
.baselineValues(3L, 2)
.go();
}
@Test
public void testSchemaProvisioning() throws Exception {
String query = "select int_field, string_field from table(dfs.tmp.testAllTypes(schema => 'inline=(int_field varchar not null default `error`)'))";
queryBuilder()
.sql(query)
.planMatcher()
.include("projection\\=struct<1: int_field: optional int, 5: string_field: optional string>")
.match();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("int_field", "string_field")
.baselineValues("1", "abc")
.baselineValues("error", null)
.baselineValues("988", "def")
.go();
}
@Test
public void testSelectAvroFormat() throws Exception {
testBuilder()
.sqlQuery("select * from dfs.tmp.testAllTypesAvro")
.unOrdered()
.baselineColumns("struct_int_field", "struct_string_field")
.baselineValues(123, "abc")
.baselineValues(null, null)
.baselineValues(321, "def")
.go();
}
@Test
public void testSelectOrcFormat() throws Exception {
testBuilder()
.sqlQuery("select * from dfs.tmp.testAllTypesOrc")
.unOrdered()
.baselineColumns("struct_int_field", "struct_string_field")
.baselineValues(123, "abc")
.baselineValues(null, null)
.baselineValues(321, "def")
.go();
}
@Test
public void testSelectEmptyTable() throws Exception {
testBuilder()
.sqlQuery("select * from dfs.tmp.testAllTypesEmpty")
.unOrdered()
.expectsEmptyResultSet()
.go();
}
@Test
public void testLimit() throws Exception {
String query = "select int_field, string_field from dfs.tmp.testAllTypes limit 1";
queryBuilder()
.sql(query)
.planMatcher()
.include("Limit\\(fetch\\=\\[1\\]\\)")
.include("maxRecords\\=1")
.match();
long count = queryBuilder().sql(query).run().recordCount();
assertEquals(1, count);
}
@Test
public void testLimitWithFilter() throws Exception {
String query = "select int_field, string_field from dfs.tmp.testAllTypes where int_field = 1 limit 1";
queryBuilder()
.sql(query)
.planMatcher()
.include("Limit\\(fetch\\=\\[1\\]\\)")
.include("maxRecords\\=1")
.include("filter\\=ref\\(name=\"int_field\"\\) \\=\\= 1")
.match();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("int_field", "string_field")
.baselineValues(1, "abc")
.go();
}
@Test
public void testNoLimitWithSort() throws Exception {
String query = "select int_field, string_field from dfs.tmp.testAllTypes order by int_field limit 1";
queryBuilder()
.sql(query)
.planMatcher()
.include("Limit\\(fetch\\=\\[1\\]\\)")
.include("maxRecords\\=-1")
.match();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("int_field", "string_field")
.baselineValues(1, "abc")
.go();
}
@Test
public void testLateralSql() throws Exception {
String sql =
"SELECT t.c_name, t2.ord.o_shop AS o_shop\n" +
"FROM cp.`lateraljoin/nested-customer.json` t,\n" +
"unnest(t.orders) t2(ord)\n" +
"LIMIT 1";
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("c_name", "o_shop")
.baselineValues("customer1", "Meno Park 1st")
.go();
}
@Test
public void testLateralSqlIceberg() throws Exception {
String sql =
"SELECT t.int_field, t2.ord.struct_string_field struct_string_field\n" +
"FROM dfs.tmp.testAllTypes t,\n" +
"unnest(t.repeated_struct_field) t2(ord)\n" +
"ORDER BY t.int_field\n" +
"LIMIT 1";
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("int_field", "struct_string_field")
.baselineValues(1, "abc")
.go();
}
@Test
public void testFilterPushCorrelate() throws Exception {
String sql =
"SELECT t.c_name, t2.ord.o_shop AS o_shop\n" +
"FROM cp.`lateraljoin/nested-customer.json` t,\n" +
"unnest(t.orders) t2(ord)\n" +
"WHERE t.c_name='customer1' AND t2.ord.o_shop='Meno Park 1st'";
testBuilder()
.unOrdered()
.sqlQuery(sql)
.baselineColumns("c_name", "o_shop")
.baselineValues("customer1", "Meno Park 1st")
.go();
}
}