| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.connector.jdbc.table; |
| |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions; |
| import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.table.api.DataTypes; |
| import org.apache.flink.table.api.Table; |
| import org.apache.flink.table.api.TableSchema; |
| import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; |
| import org.apache.flink.table.types.DataType; |
| import org.apache.flink.types.Row; |
| import org.apache.flink.util.CollectionUtil; |
| |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.flink.table.api.Expressions.$; |
| import static org.junit.Assert.assertEquals; |
| |
| /** IT case for lookup source of JDBC connector. */ |
| @RunWith(Parameterized.class) |
| public class JdbcLookupTableITCase extends JdbcLookupTestBase { |
| |
| private final String tableFactory; |
| private final boolean useCache; |
| |
| public JdbcLookupTableITCase(String tableFactory, boolean useCache) { |
| this.useCache = useCache; |
| this.tableFactory = tableFactory; |
| } |
| |
| @Parameterized.Parameters(name = "Table factory = {0}, use cache {1}") |
| @SuppressWarnings("unchecked,rawtypes") |
| public static Collection<Object[]> useCache() { |
| return Arrays.asList( |
| new Object[][] { |
| {"legacyFactory", true}, |
| {"legacyFactory", false}, |
| {"dynamicFactory", true}, |
| {"dynamicFactory", false} |
| }); |
| } |
| |
| @Test |
| public void testLookup() throws Exception { |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); |
| |
| Iterator<Row> collected; |
| if ("legacyFactory".equals(tableFactory)) { |
| collected = useLegacyTableFactory(env, tEnv); |
| } else { |
| collected = useDynamicTableFactory(env, tEnv); |
| } |
| List<String> result = |
| CollectionUtil.iteratorToList(collected).stream() |
| .map(Row::toString) |
| .sorted() |
| .collect(Collectors.toList()); |
| |
| List<String> expected = new ArrayList<>(); |
| expected.add("+I[1, 1, 11-c1-v1, 11-c2-v1]"); |
| expected.add("+I[1, 1, 11-c1-v1, 11-c2-v1]"); |
| expected.add("+I[1, 1, 11-c1-v2, 11-c2-v2]"); |
| expected.add("+I[1, 1, 11-c1-v2, 11-c2-v2]"); |
| expected.add("+I[2, 3, null, 23-c2]"); |
| expected.add("+I[2, 5, 25-c1, 25-c2]"); |
| expected.add("+I[3, 8, 38-c1, 38-c2]"); |
| Collections.sort(expected); |
| |
| assertEquals(expected, result); |
| } |
| |
| private Iterator<Row> useLegacyTableFactory( |
| StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { |
| Table t = |
| tEnv.fromDataStream( |
| env.fromCollection( |
| Arrays.asList( |
| new Tuple2<>(1, "1"), |
| new Tuple2<>(1, "1"), |
| new Tuple2<>(2, "3"), |
| new Tuple2<>(2, "5"), |
| new Tuple2<>(3, "5"), |
| new Tuple2<>(3, "8"))), |
| $("id1"), |
| $("id2")); |
| |
| tEnv.registerTable("T", t); |
| JdbcTableSource.Builder builder = |
| JdbcTableSource.builder() |
| .setOptions( |
| JdbcConnectorOptions.builder() |
| .setDBUrl(DB_URL) |
| .setTableName(LOOKUP_TABLE) |
| .build()) |
| .setSchema( |
| TableSchema.builder() |
| .fields( |
| new String[] {"id1", "comment1", "comment2", "id2"}, |
| new DataType[] { |
| DataTypes.INT(), |
| DataTypes.STRING(), |
| DataTypes.STRING(), |
| DataTypes.STRING() |
| }) |
| .build()); |
| JdbcLookupOptions.Builder lookupOptionsBuilder = |
| JdbcLookupOptions.builder().setMaxRetryTimes(0); |
| if (useCache) { |
| lookupOptionsBuilder.setCacheMaxSize(1000).setCacheExpireMs(1000 * 1000); |
| } |
| builder.setLookupOptions(lookupOptionsBuilder.build()); |
| tEnv.registerFunction( |
| "jdbcLookup", builder.build().getLookupFunction(t.getSchema().getFieldNames())); |
| |
| // do not use the first N fields as lookup keys for better coverage |
| String sqlQuery = |
| "SELECT id1, id2, comment1, comment2 FROM T, " |
| + "LATERAL TABLE(jdbcLookup(id1, id2)) AS S(l_id1, comment1, comment2, l_id2)"; |
| return tEnv.executeSql(sqlQuery).collect(); |
| } |
| |
| private Iterator<Row> useDynamicTableFactory( |
| StreamExecutionEnvironment env, StreamTableEnvironment tEnv) { |
| Table t = |
| tEnv.fromDataStream( |
| env.fromCollection( |
| Arrays.asList( |
| new Tuple2<>(1, "1"), |
| new Tuple2<>(1, "1"), |
| new Tuple2<>(2, "3"), |
| new Tuple2<>(2, "5"), |
| new Tuple2<>(3, "5"), |
| new Tuple2<>(3, "8"))), |
| $("id1"), |
| $("id2"), |
| $("proctime").proctime()); |
| |
| tEnv.createTemporaryView("T", t); |
| |
| String cacheConfig = ", 'lookup.cache.max-rows'='4', 'lookup.cache.ttl'='10000'"; |
| tEnv.executeSql( |
| String.format( |
| "create table lookup (" |
| + " id1 INT," |
| + " comment1 VARCHAR," |
| + " comment2 VARCHAR," |
| + " id2 VARCHAR" |
| + ") with(" |
| + " 'connector'='jdbc'," |
| + " 'url'='" |
| + DB_URL |
| + "'," |
| + " 'table-name'='" |
| + LOOKUP_TABLE |
| + "'," |
| + " 'lookup.max-retries' = '0'" |
| + " %s)", |
| useCache ? cacheConfig : "")); |
| |
| // do not use the first N fields as lookup keys for better coverage |
| String sqlQuery = |
| "SELECT source.id1, source.id2, L.comment1, L.comment2 FROM T AS source " |
| + "JOIN lookup for system_time as of source.proctime AS L " |
| + "ON source.id1 = L.id1 and source.id2 = L.id2"; |
| return tEnv.executeSql(sqlQuery).collect(); |
| } |
| } |