blob: c1bb4d8f97b3e73b8597f2b457fafb5c6f6d769e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.testutils.tables.templates;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.testutils.TableManaged;
import org.apache.flink.connector.jdbc.testutils.functions.JdbcResultSetBuilder;
import org.apache.flink.connector.jdbc.testutils.tables.TableBase;
import org.apache.flink.connector.jdbc.testutils.tables.TableField;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.lang.String.format;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
import static org.apache.flink.table.api.DataTypes.DOUBLE;
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.VARCHAR;
/** Book table template. * */
public class BooksTable extends TableBase<BooksTable.BookEntry> implements TableManaged {
private final JdbcStatementBuilder<BookEntry> statementBuilder =
(ps, book) -> {
ps.setInt(1, book.id);
ps.setString(2, book.title);
ps.setString(3, book.author);
if (book.price == null) {
ps.setNull(4, Types.DOUBLE);
} else {
ps.setDouble(4, book.price);
}
ps.setInt(5, book.qty);
};
private final JdbcResultSetBuilder<BookEntry> resultSetBuilder =
(rs) -> {
List<BookEntry> result = new ArrayList<>();
while (rs.next()) {
result.add(
new BookEntry(
getNullable(rs, r -> r.getInt(1)),
getNullable(rs, r -> r.getString(2)),
getNullable(rs, r -> r.getString(3)),
getNullable(rs, r -> r.getDouble(4)),
getNullable(rs, r -> r.getInt(5))));
}
return result;
};
public BooksTable(String name) {
super(
name,
Arrays.asList(
pkField("id", INT().notNull()),
field("title", VARCHAR(50)),
field("author", VARCHAR(50)),
field("price", dbType("FLOAT"), DOUBLE()),
field("qty", INT()))
.toArray(new TableField[0]));
}
public String getSelectByIdBetweenQuery() {
return format("%s WHERE id BETWEEN ? AND ?", getSelectAllQuery());
}
public String getSelectByAuthorQuery() {
return format("%s WHERE author = ?", getSelectAllQuery());
}
public String getSelectAllNoQuantityQuery() {
return format("%s WHERE QTY < 0", getSelectAllQuery());
}
// public List<BookEntry> getTestData() {
// return Arrays.asList(BooksStore.TEST_DATA);
// }
// public void insertTableTestData(Connection conn) throws SQLException {
// executeStatement(conn, getInsertIntoQuery(), statementBuilder, getTestData());
// }
public JdbcStatementBuilder<BookEntry> getStatementBuilder() {
return statementBuilder;
}
@Override
protected JdbcResultSetBuilder<BookEntry> getResultSetBuilder() {
return resultSetBuilder;
}
public List<BookEntry> selectAllTable(Connection conn) throws SQLException {
return executeStatement(conn, getSelectAllQuery(), resultSetBuilder);
}
/** Book table entry. * */
public static class BookEntry implements Serializable {
public final Integer id;
public final String title;
public final String author;
public final Double price;
public final Integer qty;
public BookEntry(Integer id, String title, String author, Double price, Integer qty) {
this.id = id;
this.title = title;
this.author = author;
this.price = price;
this.qty = qty;
}
@Override
public String toString() {
return new ToStringBuilder(this)
.append("id", id)
.append("title", title)
.append("author", author)
.append("price", price)
.append("qty", qty)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BookEntry bookEntry = (BookEntry) o;
return new EqualsBuilder()
.append(id, bookEntry.id)
.append(title, bookEntry.title)
.append(author, bookEntry.author)
.append(price, bookEntry.price)
.append(qty, bookEntry.qty)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(id)
.append(title)
.append(author)
.append(price)
.append(qty)
.toHashCode();
}
}
}