blob: d8cbd79373168b04e8bdefaaee26035e500d9288 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.testutils.tables;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.functions.JdbcResultSetBuilder;
import org.apache.flink.connector.jdbc.utils.JdbcTypeUtil;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** Base table operations. * */
public abstract class TableBase<T> implements TableManaged {
private final String name;
private final TableField[] fields;
protected TableBase(String name, TableField[] fields) {
Preconditions.checkArgument(name != null && !name.isEmpty(), "Table name must be defined");
Preconditions.checkArgument(
fields != null && fields.length != 0, "Table fields must be defined");
this.name = name;
this.fields = fields;
}
protected abstract JdbcResultSetBuilder<T> getResultSetBuilder();
public String getTableName() {
return name;
}
private Stream<TableField> getStreamFields() {
return Arrays.stream(this.fields);
}
private Stream<String> getStreamFieldNames() {
return getStreamFields().map(TableField::getName);
}
private Stream<DataType> getStreamDataTypes() {
return getStreamFields().map(TableField::getDataType);
}
public String[] getTableFields() {
return getStreamFieldNames().toArray(String[]::new);
}
public DataTypes.Field[] getTableDataFields() {
return getStreamFields()
.map(field -> DataTypes.FIELD(field.getName(), field.getDataType()))
.toArray(DataTypes.Field[]::new);
}
public DataType[] getTableDataTypes() {
return getStreamDataTypes().toArray(DataType[]::new);
}
public RowTypeInfo getTableRowTypeInfo() {
TypeInformation<?>[] typesArray =
getStreamDataTypes()
.map(TypeConversions::fromDataTypeToLegacyInfo)
.toArray(TypeInformation[]::new);
String[] fieldsArray = getTableFields();
return new RowTypeInfo(typesArray, fieldsArray);
}
public RowType getTableRowType() {
LogicalType[] typesArray =
getStreamDataTypes().map(DataType::getLogicalType).toArray(LogicalType[]::new);
String[] fieldsArray = getTableFields();
return RowType.of(typesArray, fieldsArray);
}
public int[] getTableTypes() {
return getStreamDataTypes()
.map(DataType::getLogicalType)
.map(LogicalType::getTypeRoot)
.map(JdbcTypeUtil::logicalTypeToSqlType)
.mapToInt(x -> x)
.toArray();
}
public Schema getTableSchema() {
Schema.Builder schema = Schema.newBuilder();
getStreamFields().forEach(field -> schema.column(field.getName(), field.getDataType()));
String pkFields =
getStreamFields()
.filter(TableField::isPkField)
.map(TableField::getName)
.collect(Collectors.joining(", "));
schema.primaryKeyNamed("PRIMARY", pkFields);
return schema.build();
}
public ResolvedSchema getTableResolvedSchema() {
return ResolvedSchema.of(
getStreamFields()
.map(field -> Column.physical(field.getName(), field.getDataType()))
.collect(Collectors.toList()));
}
public String getCreateQuery() {
String pkFields =
getStreamFields()
.filter(TableField::isPkField)
.map(TableField::getName)
.collect(Collectors.joining(", "));
return String.format(
"CREATE TABLE %s (%s%s)",
name,
getStreamFields().map(TableField::asString).collect(Collectors.joining(", ")),
pkFields.isEmpty() ? "" : String.format(", PRIMARY KEY (%s)", pkFields));
}
public String getCreateQueryForFlink(DatabaseMetadata metadata, String newName) {
return getCreateQueryForFlink(metadata, newName, Collections.emptyList());
}
public String getCreateQueryForFlink(
DatabaseMetadata metadata, String newName, List<String> withParams) {
return getCreateQueryForFlink(
metadata, newName, Arrays.asList(getTableFields()), withParams);
}
public String getCreateQueryForFlink(
DatabaseMetadata metadata,
String newName,
List<String> newFields,
List<String> withParams) {
Map<String, TableField> fieldsMap =
getStreamFields().collect(Collectors.toMap(TableField::getName, f -> f));
String fields =
newFields.stream()
.map(fieldsMap::get)
.map(field -> String.format("%s %s", field.getName(), field.getDataType()))
.collect(Collectors.joining(", "));
String pkFields =
getStreamFields()
.filter(TableField::isPkField)
.map(TableField::getName)
.collect(Collectors.joining(", "));
String primaryKey =
(pkFields.isEmpty())
? ""
: String.format(", PRIMARY KEY (%s) NOT ENFORCED", pkFields);
List<String> params = new ArrayList<>();
params.add("'connector'='jdbc'");
params.add(String.format("'table-name'='%s'", getTableName()));
params.add(String.format("'url'='%s'", metadata.getJdbcUrl()));
params.add(String.format("'username'='%s'", metadata.getUsername()));
params.add(String.format("'password'='%s'", metadata.getPassword()));
params.addAll(withParams);
return String.format(
"CREATE TABLE %s (%s%s) WITH (%s)",
newName, fields, primaryKey, String.join(", ", params));
}
private String getInsertIntoQuery(String... values) {
return String.format(
"INSERT INTO %s (%s) VALUES %s",
name,
getStreamFieldNames().collect(Collectors.joining(", ")),
Arrays.stream(values)
.map(v -> String.format("(%s)", v))
.collect(Collectors.joining(",")));
}
public String getInsertIntoQuery() {
return getInsertIntoQuery(
getStreamFieldNames().map(x -> "?").collect(Collectors.joining(", ")));
}
public String getSelectAllQuery() {
return String.format(
"SELECT %s FROM %s", getStreamFieldNames().collect(Collectors.joining(", ")), name);
}
protected String getDeleteFromQuery() {
return String.format("DELETE FROM %s", name);
}
public String getDropTableQuery() {
return String.format("DROP TABLE %s", name);
}
public void createTable(Connection conn) throws SQLException {
executeUpdate(conn, getCreateQuery());
}
public void insertIntoTableValues(Connection conn, String... values) throws SQLException {
executeUpdate(conn, getInsertIntoQuery(values));
}
public List<T> selectAllTable(DatabaseMetadata metadata) throws SQLException {
try (Connection conn = metadata.getConnection()) {
return selectAllTable(conn);
}
}
public List<T> selectAllTable(Connection conn) throws SQLException {
return executeStatement(conn, getSelectAllQuery(), getResultSetBuilder());
}
public void deleteTable(Connection conn) throws SQLException {
executeUpdate(conn, getDeleteFromQuery());
}
public void dropTable(Connection conn) throws SQLException {
executeUpdate(conn, getDropTableQuery());
}
protected void executeUpdate(Connection conn, String sql) throws SQLException {
try (Statement st = conn.createStatement()) {
st.executeUpdate(sql);
}
}
protected <T> List<T> executeStatement(
Connection conn, String sql, JdbcResultSetBuilder<T> rsGetter) throws SQLException {
try (Statement st = conn.createStatement();
ResultSet rs = st.executeQuery(sql)) {
return rsGetter.accept(rs);
}
}
protected <T> int[] executeStatement(
Connection conn, String sql, JdbcStatementBuilder<T> psSetter, List<T> values)
throws SQLException {
try (PreparedStatement ps = conn.prepareStatement(sql)) {
for (T value : values) {
psSetter.accept(ps, value);
ps.addBatch();
}
return ps.executeBatch();
}
}
protected <T> T getNullable(ResultSet rs, FunctionWithException<ResultSet, T, SQLException> get)
throws SQLException {
T value = get.apply(rs);
return getNullable(rs, value);
}
protected <T> T getNullable(ResultSet rs, T value) throws SQLException {
return rs.wasNull() ? null : value;
}
}