blob: a0ba00c93070c4b7d63e29f6ab5139047291144d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.catalog.hive;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.config.CatalogConfig;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
/** Test for data type mappings in HiveCatalog. */
public class HiveCatalogDataTypeTest {
private static HiveCatalog catalog;
protected final String db1 = "db1";
protected final String db2 = "db2";
protected final String t1 = "t1";
protected final String t2 = "t2";
protected final ObjectPath path1 = new ObjectPath(db1, t1);
protected final ObjectPath path2 = new ObjectPath(db2, t2);
protected final ObjectPath path3 = new ObjectPath(db1, t2);
@Rule public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void init() {
catalog = HiveTestUtils.createHiveCatalog();
catalog.open();
}
@After
public void cleanup() throws Exception {
if (catalog.tableExists(path1)) {
catalog.dropTable(path1, true);
}
if (catalog.tableExists(path2)) {
catalog.dropTable(path2, true);
}
if (catalog.tableExists(path3)) {
catalog.dropTable(path3, true);
}
if (catalog.functionExists(path1)) {
catalog.dropFunction(path1, true);
}
if (catalog.databaseExists(db1)) {
catalog.dropDatabase(db1, true);
}
if (catalog.databaseExists(db2)) {
catalog.dropDatabase(db2, true);
}
}
@AfterClass
public static void closeup() {
if (catalog != null) {
catalog.close();
}
}
@Test
public void testDataTypes() throws Exception {
DataType[] types =
new DataType[] {
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.INT(),
DataTypes.BIGINT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.BOOLEAN(),
DataTypes.STRING(),
DataTypes.BYTES(),
DataTypes.DATE(),
DataTypes.TIMESTAMP(9),
DataTypes.CHAR(HiveChar.MAX_CHAR_LENGTH),
DataTypes.VARCHAR(HiveVarchar.MAX_VARCHAR_LENGTH),
DataTypes.DECIMAL(5, 3)
};
verifyDataTypes(types);
}
@Test
public void testNonSupportedBinaryDataTypes() throws Exception {
DataType[] types = new DataType[] {DataTypes.BINARY(BinaryType.MAX_LENGTH)};
CatalogTable table = createCatalogTable(types);
catalog.createDatabase(db1, createDb(), false);
exception.expect(UnsupportedOperationException.class);
catalog.createTable(path1, table, false);
}
@Test
public void testNonSupportedVarBinaryDataTypes() throws Exception {
DataType[] types = new DataType[] {DataTypes.VARBINARY(20)};
CatalogTable table = createCatalogTable(types);
catalog.createDatabase(db1, createDb(), false);
exception.expect(UnsupportedOperationException.class);
catalog.createTable(path1, table, false);
}
@Test
public void testCharTypeLength() throws Exception {
DataType[] types = new DataType[] {DataTypes.CHAR(HiveChar.MAX_CHAR_LENGTH + 1)};
exception.expect(CatalogException.class);
verifyDataTypes(types);
}
@Test
public void testVarCharTypeLength() throws Exception {
DataType[] types = new DataType[] {DataTypes.VARCHAR(HiveVarchar.MAX_VARCHAR_LENGTH + 1)};
exception.expect(CatalogException.class);
verifyDataTypes(types);
}
@Test
public void testComplexDataTypes() throws Exception {
DataType[] types =
new DataType[] {
DataTypes.ARRAY(DataTypes.DOUBLE()),
DataTypes.MAP(DataTypes.FLOAT(), DataTypes.BIGINT()),
DataTypes.ROW(
DataTypes.FIELD("0", DataTypes.BOOLEAN()),
DataTypes.FIELD("1", DataTypes.BOOLEAN()),
DataTypes.FIELD("2", DataTypes.DATE())),
// nested complex types
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
DataTypes.MAP(
DataTypes.STRING(),
DataTypes.MAP(DataTypes.STRING(), DataTypes.BIGINT())),
DataTypes.ROW(
DataTypes.FIELD("3", DataTypes.ARRAY(DataTypes.DECIMAL(5, 3))),
DataTypes.FIELD(
"4", DataTypes.MAP(DataTypes.TINYINT(), DataTypes.SMALLINT())),
DataTypes.FIELD(
"5",
DataTypes.ROW(DataTypes.FIELD("3", DataTypes.TIMESTAMP(9)))))
};
verifyDataTypes(types);
}
private CatalogTable createCatalogTable(DataType[] types) {
String[] colNames = new String[types.length];
for (int i = 0; i < types.length; i++) {
colNames[i] = String.format("%s_%d", types[i].toString().toLowerCase(), i);
}
TableSchema schema = TableSchema.builder().fields(colNames, types).build();
return new CatalogTableImpl(
schema,
new HashMap<String, String>() {
{
put("is_streaming", "false");
put(CatalogConfig.IS_GENERIC, String.valueOf(false));
}
},
"");
}
private void verifyDataTypes(DataType[] types) throws Exception {
CatalogTable table = createCatalogTable(types);
catalog.createDatabase(db1, createDb(), false);
catalog.createTable(path1, table, false);
assertEquals(table.getSchema(), catalog.getTable(path1).getSchema());
}
private static CatalogDatabase createDb() {
return new CatalogDatabaseImpl(
new HashMap<String, String>() {
{
put("k1", "v1");
}
},
"");
}
}