blob: 326d918daef3291ba5a911b09cb984caf1bbeffc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.delta;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.junit.BeforeClass;
import org.junit.Test;
import java.math.BigDecimal;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
import static org.junit.Assert.assertEquals;
public class DeltaQueriesTest extends ClusterTest {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
startCluster(ClusterFixture.builder(dirTestWatcher));
StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
formats.put("delta", new DeltaFormatPluginConfig(null, null));
FileSystemConfig newPluginConfig = new FileSystemConfig(
pluginConfig.getConnection(),
pluginConfig.getConfig(),
pluginConfig.getWorkspaces(),
formats,
PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
newPluginConfig.setEnabled(pluginConfig.isEnabled());
pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-primitives"));
dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-partition-values"));
dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-nested-struct"));
}
@Test
public void testSerDe() throws Exception {
client.alterSession(ExecConstants.SLICE_TARGET, 1);
String plan = queryBuilder().sql("select * from table(dfs.`data-reader-partition-values` (type => 'delta'))").explainJson();
long count = queryBuilder().physical(plan).run().recordCount();
assertEquals(3, count);
}
@Test
public void testAllPrimitives() throws Exception {
testBuilder()
.sqlQuery("select * from dfs.`data-reader-primitives`")
.ordered()
.baselineColumns("as_int", "as_long", "as_byte", "as_short", "as_boolean", "as_float",
"as_double", "as_string", "as_binary", "as_big_decimal")
.baselineValues(null, null, null, null, null, null, null, null, null, null)
.baselineValues(0, 0L, 0, 0, true, 0.0f, 0.0, "0", new byte[]{0, 0}, BigDecimal.valueOf(0))
.baselineValues(1, 1L, 1, 1, false, 1.0f, 1.0, "1", new byte[]{1, 1}, BigDecimal.valueOf(1))
.baselineValues(2, 2L, 2, 2, true, 2.0f, 2.0, "2", new byte[]{2, 2}, BigDecimal.valueOf(2))
.baselineValues(3, 3L, 3, 3, false, 3.0f, 3.0, "3", new byte[]{3, 3}, BigDecimal.valueOf(3))
.baselineValues(4, 4L, 4, 4, true, 4.0f, 4.0, "4", new byte[]{4, 4}, BigDecimal.valueOf(4))
.baselineValues(5, 5L, 5, 5, false, 5.0f, 5.0, "5", new byte[]{5, 5}, BigDecimal.valueOf(5))
.baselineValues(6, 6L, 6, 6, true, 6.0f, 6.0, "6", new byte[]{6, 6}, BigDecimal.valueOf(6))
.baselineValues(7, 7L, 7, 7, false, 7.0f, 7.0, "7", new byte[]{7, 7}, BigDecimal.valueOf(7))
.baselineValues(8, 8L, 8, 8, true, 8.0f, 8.0, "8", new byte[]{8, 8}, BigDecimal.valueOf(8))
.baselineValues(9, 9L, 9, 9, false, 9.0f, 9.0, "9", new byte[]{9, 9}, BigDecimal.valueOf(9))
.go();
}
@Test
public void testProjectingColumns() throws Exception {
String query = "select as_int, as_string from dfs.`data-reader-primitives`";
queryBuilder()
.sql(query)
.planMatcher()
.include("columns=\\[`as_int`, `as_string`\\]")
.match();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("as_int", "as_string")
.baselineValues(null, null)
.baselineValues(0, "0")
.baselineValues(1, "1")
.baselineValues(2, "2")
.baselineValues(3, "3")
.baselineValues(4, "4")
.baselineValues(5, "5")
.baselineValues(6, "6")
.baselineValues(7, "7")
.baselineValues(8, "8")
.baselineValues(9, "9")
.go();
}
@Test
public void testProjectNestedColumn() throws Exception {
String query = "select t.a.ac.acb as acb, b from dfs.`data-reader-nested-struct` t";
queryBuilder()
.sql(query)
.planMatcher()
.include("columns=\\[`a`.`ac`.`acb`, `b`\\]")
.match();
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("acb", "b")
.baselineValues(0L, 0)
.baselineValues(1L, 1)
.baselineValues(2L, 2)
.baselineValues(3L, 3)
.baselineValues(4L, 4)
.baselineValues(5L, 5)
.baselineValues(6L, 6)
.baselineValues(7L, 7)
.baselineValues(8L, 8)
.baselineValues(9L, 9)
.go();
}
@Test
public void testPartitionPruning() throws Exception {
String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 1";
queryBuilder()
.sql(query)
.planMatcher()
.include("numFiles\\=1")
.match();
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("as_int", "as_string")
.baselineValues("1", "1")
.go();
}
@Test
public void testEmptyResults() throws Exception {
String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 101";
queryBuilder()
.sql(query)
.planMatcher()
.include("numFiles\\=1")
.match();
testBuilder()
.sqlQuery(query)
.ordered()
.expectsEmptyResultSet()
.go();
}
@Test
public void testLimit() throws Exception {
String query = "select as_int, as_string from dfs.`data-reader-partition-values` limit 1";
// Note that both of the following two limits are expected because this format plugin supports an "artificial" limit.
queryBuilder()
.sql(query)
.planMatcher()
.include("Limit\\(fetch\\=\\[1\\]\\)")
.include("limit\\=1")
.match();
long count = queryBuilder().sql(query).run().recordCount();
assertEquals(1, count);
}
@Test
public void testSnapshotVersion() throws Exception {
String query = "select as_int, as_string " +
"from table(dfs.`data-reader-partition-values`(type => 'delta', version => 0)) where as_long = 1";
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("as_int", "as_string")
.baselineValues("1", "1")
.go();
}
}