blob: 4aca1a8c69ecafe778bea7eccd1f381e55ecc34d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store;
import org.apache.drill.PlanTestBase;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.avro.AvroDataGenerator;
import org.junit.Test;
import java.nio.file.Paths;
import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_TMP_SCHEMA;
public class FormatPluginSerDeTest extends PlanTestBase {
@Test
public void testParquet() throws Exception {
try {
setSessionOption(ExecConstants.SLICE_TARGET, 1);
testPhysicalPlanSubmission(
String.format("select * from table(cp.`%s`(type=>'parquet'))", "parquet/alltypes_required.parquet"),
String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>false))", "parquet/alltypes_required.parquet"),
String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true))", "parquet/alltypes_required.parquet"),
String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>true, enableStringsSignedMinMax=>false))", "parquet/alltypes_required.parquet"),
String.format("select * from table(cp.`%s`(type=>'parquet', autoCorrectCorruptDates=>false, enableStringsSignedMinMax=>true))", "parquet/alltypes_required.parquet"));
} finally {
resetSessionOption(ExecConstants.SLICE_TARGET);
}
}
@Test
public void testParquetWithMetadata() throws Exception {
String tableName = "alltypes_required_with_metadata";
test("use %s", DFS_TMP_SCHEMA);
try {
test("create table %s as select * from cp.`parquet/alltypes_required.parquet`", tableName);
test("refresh table metadata %s", tableName);
setSessionOption(ExecConstants.SLICE_TARGET, 1);
testPhysicalPlanSubmission(
String.format("select * from table(`%s`(type=>'parquet'))", tableName),
String.format("select * from table(`%s`(type=>'parquet', autoCorrectCorruptDates=>false))", tableName),
String.format("select * from table(`%s`(type=>'parquet', autoCorrectCorruptDates=>true))", tableName));
} finally {
resetSessionOption(ExecConstants.SLICE_TARGET);
test("drop table if exists %s", tableName);
}
}
@Test
public void testAvro() throws Exception {
AvroDataGenerator dataGenerator = new AvroDataGenerator(dirTestWatcher);
String file = dataGenerator.generateSimplePrimitiveSchema_NoNullValues(5).getFileName();
testPhysicalPlanSubmission(
String.format("select * from dfs.`%s`", file),
String.format("select * from table(dfs.`%s`(type=>'avro'))", file)
);
}
@Test
public void testSequenceFile() throws Exception {
String path = "sequencefiles/simple.seq";
dirTestWatcher.copyResourceToRoot(Paths.get(path));
testPhysicalPlanSubmission(
String.format("select * from dfs.`%s`", path),
String.format("select * from table(dfs.`%s`(type=>'sequencefile'))", path)
);
}
@Test
public void testPcap() throws Exception {
String path = "store/pcap/tcp-1.pcap";
dirTestWatcher.copyResourceToRoot(Paths.get(path));
testPhysicalPlanSubmission(
String.format("select * from dfs.`%s`", path),
String.format("select * from table(dfs.`%s`(type=>'pcap'))", path)
);
}
@Test
public void testHttpd() throws Exception {
String path = "store/httpd/dfs-test-bootstrap-test.httpd";
dirTestWatcher.copyResourceToRoot(Paths.get(path));
String logFormat = "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\"";
String timeStampFormat = "dd/MMM/yyyy:HH:mm:ss ZZ";
testPhysicalPlanSubmission(
String.format("select * from dfs.`%s`", path),
String.format("select * from table(dfs.`%s`(type=>'httpd', logFormat=>'%s'))", path, logFormat),
String.format("select * from table(dfs.`%s`(type=>'httpd', logFormat=>'%s', timestampFormat=>'%s'))", path, logFormat, timeStampFormat)
);
}
@Test
public void testJson() throws Exception {
testPhysicalPlanSubmission(
"select * from cp.`donuts.json`",
"select * from table(cp.`donuts.json`(type=>'json'))"
);
}
@Test
public void testText() throws Exception {
String path = "store/text/data/regions.csv";
dirTestWatcher.copyResourceToRoot(Paths.get(path));
testPhysicalPlanSubmission(
String.format("select * from table(dfs.`%s`(type => 'text'))", path),
String.format("select * from table(dfs.`%s`(type => 'text', extractHeader => false, fieldDelimiter => 'A'))", path)
);
}
@Test
public void testNamed() throws Exception {
String path = "store/text/WithQuote.tbl";
dirTestWatcher.copyResourceToRoot(Paths.get(path));
String query = String.format("select * from table(dfs.`%s`(type=>'named', name=>'psv'))", path);
testPhysicalPlanSubmission(query);
}
private void testPhysicalPlanSubmission(String...queries) throws Exception {
for (String query : queries) {
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery(query);
}
}
}