blob: 5fc8ccaf6a1511ac65db128aade27e979c81afa6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connectors.kudu.table.utils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connectors.kudu.connector.ColumnSchemasFactory;
import org.apache.flink.connectors.kudu.connector.CreateTableOptionsFactory;
import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
import org.apache.flink.connectors.kudu.table.KuduTableFactory;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.client.CreateTableOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
public class KuduTableUtils {
private static final Logger LOG = LoggerFactory.getLogger(KuduTableUtils.class);
public static KuduTableInfo createTableInfo(String tableName, TableSchema schema, Map<String, String> props) {
// Since KUDU_HASH_COLS is a required property for table creation, we use it to infer whether to create table
boolean createIfMissing = props.containsKey(KUDU_HASH_COLS);
KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
if (createIfMissing) {
List<Tuple2<String, DataType>> columns = getSchemaWithSqlTimestamp(schema)
.getTableColumns()
.stream()
.map(tc -> Tuple2.of(tc.getName(), tc.getType()))
.collect(Collectors.toList());
List<String> keyColumns = getPrimaryKeyColumns(props, schema);
ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, keyColumns);
List<String> hashColumns = getHashColumns(props);
int replicas = Optional.ofNullable(props.get(KuduTableFactory.KUDU_REPLICAS)).map(Integer::parseInt).orElse(1);
CreateTableOptionsFactory optionsFactory = () -> new CreateTableOptions()
.setNumReplicas(replicas)
.addHashPartitions(hashColumns, replicas * 2);
tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);
} else {
LOG.debug("Property {} is missing, assuming the table is already created.", KUDU_HASH_COLS);
}
return tableInfo;
}
public static List<ColumnSchema> toKuduConnectorColumns(List<Tuple2<String, DataType>> columns, Collection<String> keyColumns) {
return columns.stream()
.map(t -> {
ColumnSchema.ColumnSchemaBuilder builder = new ColumnSchema
.ColumnSchemaBuilder(t.f0, KuduTypeUtils.toKuduType(t.f1))
.key(keyColumns.contains(t.f0))
.nullable(!keyColumns.contains(t.f0) && t.f1.getLogicalType().isNullable());
if(t.f1.getLogicalType() instanceof DecimalType) {
DecimalType decimalType = ((DecimalType) t.f1.getLogicalType());
builder.typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
.precision(decimalType.getPrecision())
.scale(decimalType.getScale())
.build());
}
return builder.build();
}
).collect(Collectors.toList());
}
public static TableSchema kuduToFlinkSchema(Schema schema) {
TableSchema.Builder builder = TableSchema.builder();
for (ColumnSchema column : schema.getColumns()) {
DataType flinkType = KuduTypeUtils.toFlinkType(column.getType(), column.getTypeAttributes()).nullable();
builder.field(column.getName(), flinkType);
}
return builder.build();
}
public static List<String> getPrimaryKeyColumns(Map<String, String> tableProperties, TableSchema tableSchema) {
return tableProperties.containsKey(KUDU_PRIMARY_KEY_COLS) ? Arrays.asList(tableProperties.get(KUDU_PRIMARY_KEY_COLS).split(",")) : tableSchema.getPrimaryKey().get().getColumns();
}
public static List<String> getHashColumns(Map<String, String> tableProperties) {
return Lists.newArrayList(tableProperties.get(KUDU_HASH_COLS).split(","));
}
public static TableSchema getSchemaWithSqlTimestamp(TableSchema schema) {
TableSchema.Builder builder = new TableSchema.Builder();
TableSchemaUtils.getPhysicalSchema(schema).getTableColumns().forEach(
tableColumn -> {
if (tableColumn.getType().getLogicalType() instanceof TimestampType) {
builder.field(tableColumn.getName(), tableColumn.getType().bridgedTo(Timestamp.class));
} else {
builder.field(tableColumn.getName(), tableColumn.getType());
}
});
return builder.build();
}
}