blob: b1488e8afcb97e4ba76b905a14bed73e38714d9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.sqoop.testutil.CommonArgs;
import org.apache.sqoop.testutil.HsqldbTestServer;
import org.apache.sqoop.testutil.ImportJobTestCase;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sqoop.util.ParquetReader;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.sqoop.avro.AvroUtil.getAvroSchemaFromParquetFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
/**
* Tests --as-parquetfile.
*/
@RunWith(Parameterized.class)
public class TestParquetImport extends ImportJobTestCase {
public static final Log LOG = LogFactory
.getLog(TestParquetImport.class.getName());
private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE = "kite";
private static String PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP = "hadoop";
@Parameters(name = "parquetImplementation = {0}")
public static Iterable<? extends Object> parquetImplementationParameters() {
return Arrays.asList(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE, PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP);
}
private final String parquetImplementation;
public TestParquetImport(String parquetImplementation) {
this.parquetImplementation = parquetImplementation;
}
/**
* Create the argv to pass to Sqoop.
*
* @return the argv as an array of strings.
*/
protected String[] getOutputArgv(boolean includeHadoopFlags,
String[] extraArgs) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
}
args.add("--table");
args.add(getTableName());
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--warehouse-dir");
args.add(getWarehouseDir());
args.add("--m");
args.add("1");
args.add("--split-by");
args.add("INTFIELD1");
args.add("--as-parquetfile");
if (extraArgs != null) {
args.addAll(Arrays.asList(extraArgs));
}
return args.toArray(new String[args.size()]);
}
protected String[] getOutputQueryArgv(boolean includeHadoopFlags, String[] extraArgs) {
ArrayList<String> args = new ArrayList<String>();
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
}
args.add("--query");
args.add("SELECT * FROM " + getTableName() + " WHERE $CONDITIONS");
args.add("--connect");
args.add(HsqldbTestServer.getUrl());
args.add("--target-dir");
args.add(getWarehouseDir() + "/" + getTableName());
args.add("--m");
args.add("1");
args.add("--split-by");
args.add("INTFIELD1");
args.add("--as-parquetfile");
if (extraArgs != null) {
args.addAll(Arrays.asList(extraArgs));
}
return args.toArray(new String[args.size()]);
}
@Test
public void testSnappyCompression() throws IOException {
runParquetImportTest("snappy");
}
@Test
public void testHadoopGzipCompression() throws IOException {
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
runParquetImportTest("gzip");
}
@Test
public void testKiteDeflateCompression() throws IOException {
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_KITE.equals(parquetImplementation));
// The current Kite-based Parquet writing implementation uses GZIP compression codec when Deflate is specified.
// See: org.kitesdk.data.spi.filesystem.ParquetAppender.getCompressionCodecName()
runParquetImportTest("deflate", "gzip");
}
/**
* This test case is added to document that the deflate codec is not supported with
* the Hadoop Parquet implementation so Sqoop throws an exception when it is specified.
* @throws IOException
*/
@Test(expected = IOException.class)
public void testHadoopDeflateCompression() throws IOException {
assumeTrue(PARQUET_CONFIGURATOR_IMPLEMENTATION_HADOOP.equals(parquetImplementation));
runParquetImportTest("deflate");
}
private void runParquetImportTest(String codec) throws IOException {
runParquetImportTest(codec, codec);
}
private void runParquetImportTest(String codec, String expectedCodec) throws IOException {
String[] types = {"BIT", "INTEGER", "BIGINT", "REAL", "DOUBLE", "VARCHAR(6)",
"VARBINARY(2)",};
String[] vals = {"true", "100", "200", "1.0", "2.0", "'s'", "'0102'", };
createTableWithColTypes(types, vals);
String [] extraArgs = { "--compression-codec", codec};
runImport(getOutputArgv(true, extraArgs));
ParquetReader parquetReader = new ParquetReader(getTablePath());
assertEquals(expectedCodec.toUpperCase(), parquetReader.getCodec().name());
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
checkField(fields.get(0), "DATA_COL0", Type.BOOLEAN);
checkField(fields.get(1), "DATA_COL1", Type.INT);
checkField(fields.get(2), "DATA_COL2", Type.LONG);
checkField(fields.get(3), "DATA_COL3", Type.FLOAT);
checkField(fields.get(4), "DATA_COL4", Type.DOUBLE);
checkField(fields.get(5), "DATA_COL5", Type.STRING);
checkField(fields.get(6), "DATA_COL6", Type.BYTES);
List<GenericRecord> genericRecords = parquetReader.readAll();
GenericRecord record1 = genericRecords.get(0);
assertNotNull(record1);
assertEquals("DATA_COL0", true, record1.get("DATA_COL0"));
assertEquals("DATA_COL1", 100, record1.get("DATA_COL1"));
assertEquals("DATA_COL2", 200L, record1.get("DATA_COL2"));
assertEquals("DATA_COL3", 1.0f, record1.get("DATA_COL3"));
assertEquals("DATA_COL4", 2.0, record1.get("DATA_COL4"));
assertEquals("DATA_COL5", "s", record1.get("DATA_COL5"));
Object object = record1.get("DATA_COL6");
assertTrue(object instanceof ByteBuffer);
ByteBuffer b = ((ByteBuffer) object);
assertEquals((byte) 1, b.get(0));
assertEquals((byte) 2, b.get(1));
assertEquals(1, genericRecords.size());
}
@Test
public void testOverrideTypeMapping() throws IOException {
String [] types = { "INT" };
String [] vals = { "10" };
createTableWithColTypes(types, vals);
String [] extraArgs = { "--map-column-java", "DATA_COL0=String"};
runImport(getOutputArgv(true, extraArgs));
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
checkField(fields.get(0), "DATA_COL0", Type.STRING);
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
GenericRecord record1 = genericRecords.get(0);
assertEquals("DATA_COL0", "10", record1.get("DATA_COL0"));
assertEquals(1, genericRecords.size());
}
@Test
public void testFirstUnderscoreInColumnName() throws IOException {
String [] names = { "_NAME" };
String [] types = { "INT" };
String [] vals = { "1987" };
createTableWithColTypesAndNames(names, types, vals);
runImport(getOutputArgv(true, null));
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
checkField(fields.get(0), "__NAME", Type.INT);
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
GenericRecord record1 = genericRecords.get(0);
assertEquals("__NAME", 1987, record1.get("__NAME"));
assertEquals(1, genericRecords.size());
}
@Test
public void testNonIdentCharactersInColumnName() throws IOException {
String [] names = { "test_p-a+r/quet" };
String [] types = { "INT" };
String [] vals = { "2015" };
createTableWithColTypesAndNames(names, types, vals);
runImport(getOutputArgv(true, null));
Schema schema = getAvroSchemaFromParquetFile(getTablePath(), getConf());
assertEquals(Type.RECORD, schema.getType());
List<Field> fields = schema.getFields();
assertEquals(types.length, fields.size());
checkField(fields.get(0), "TEST_P_A_R_QUET", Type.INT);
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
GenericRecord record1 = genericRecords.get(0);
assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET"));
assertEquals(1, genericRecords.size());
}
@Test
public void testNullableParquetImport() throws IOException, SQLException {
String [] types = { "INT" };
String [] vals = { null };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, null));
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
GenericRecord record1 = genericRecords.get(0);
assertNull(record1.get("DATA_COL0"));
assertEquals(1, genericRecords.size());
}
@Test
public void testQueryImport() throws IOException, SQLException {
String [] types = { "INT" };
String [] vals = { "1" };
createTableWithColTypes(types, vals);
runImport(getOutputQueryArgv(true, null));
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
GenericRecord record1 = genericRecords.get(0);
assertEquals(1, record1.get("DATA_COL0"));
assertEquals(1, genericRecords.size());
}
@Test
public void testIncrementalParquetImport() throws IOException, SQLException {
String [] types = { "INT" };
String [] vals = { "1" };
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, null));
runImport(getOutputArgv(true, new String[]{"--append"}));
List<GenericRecord> genericRecords = new ParquetReader(getTablePath()).readAll();
GenericRecord record1 = genericRecords.get(0);
assertEquals(1, record1.get("DATA_COL0"));
record1 = genericRecords.get(1);
assertEquals(1, record1.get("DATA_COL0"));
assertEquals(2, genericRecords.size());
}
@Test
public void testOverwriteParquetDatasetFail() throws IOException, SQLException {
String [] types = { "INT" };
String [] vals = {};
createTableWithColTypes(types, vals);
runImport(getOutputArgv(true, null));
try {
runImport(getOutputArgv(true, null));
fail("");
} catch (IOException ex) {
// ok
}
}
private void checkField(Field field, String name, Type type) {
assertEquals(name, field.name());
assertEquals(Type.UNION, field.schema().getType());
assertEquals(Type.NULL, field.schema().getTypes().get(0).getType());
assertEquals(type, field.schema().getTypes().get(1).getType());
}
@Override
protected Configuration getConf() {
Configuration conf = super.getConf();
conf.set("parquetjob.configurator.implementation", parquetImplementation);
return conf;
}
}