blob: 61c82f23e072737cbf2120df48d3215a8bd287d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.sort.parser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Test for SqlServer{@link SqlServerLoadNode} and {@link SqlServerExtractNode} SQL parser.
*/
public class SqlServerNodeSqlParseTest extends AbstractTestBase {
/**
* Build mysql extract node.
*/
private MySqlExtractNode buildMySQLExtractNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()));
// if you hope hive load mode of append,please add this config.
Map<String, String> map = new HashMap<>();
map.put("append-mode", "true");
return new MySqlExtractNode(id, "mysql", fields,
null, map, "id",
Collections.singletonList("work1"), "localhost", "root", "password",
"inlong", null, null,
null, null);
}
/**
* Build sqlserver extract node.
*/
private SqlServerExtractNode buildSqlServerExtractNode(String id) {
List<FieldInfo> fields = Arrays.asList(
new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("val_char", new StringFormatInfo()),
new FieldInfo("proctime", new TimestampFormatInfo()),
new FieldInfo("database_name", new StringFormatInfo()),
new FieldInfo("table_name", new StringFormatInfo()),
new FieldInfo("op_ts", new TimestampFormatInfo()),
new FieldInfo("schema_name", new StringFormatInfo()));
return new SqlServerExtractNode(id, "sqlserver_out", fields, null, null,
null, "localhost", 1433, "SA", "INLONG*123",
"column_type_test", "dbo", "full_types", null);
}
/**
* Build kafka load node.
*/
private KafkaLoadNode buildKafkaNode(String id) {
List<FieldInfo> fields = Arrays.asList(
new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("val_char", new StringFormatInfo()),
new FieldInfo("proctime", new TimestampFormatInfo()),
new FieldInfo("database_name", new StringFormatInfo()),
new FieldInfo("table_name", new StringFormatInfo()),
new FieldInfo("op_ts", new TimestampFormatInfo()),
new FieldInfo("schema_name", new StringFormatInfo()));
List<FieldRelation> relations = Arrays.asList(
new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
new FieldRelation(new FieldInfo("val_char", new StringFormatInfo()),
new FieldInfo("val_char", new StringFormatInfo())),
new FieldRelation(new FieldInfo("proctime", new TimestampFormatInfo()),
new FieldInfo("proctime", new TimestampFormatInfo())),
new FieldRelation(new FieldInfo("database_name", new StringFormatInfo()),
new FieldInfo("database_name", new StringFormatInfo())),
new FieldRelation(new FieldInfo("table_name", new StringFormatInfo()),
new FieldInfo("table_name", new StringFormatInfo())),
new FieldRelation(new FieldInfo("op_ts", new TimestampFormatInfo()),
new FieldInfo("op_ts", new TimestampFormatInfo())),
new FieldRelation(new FieldInfo("schema_name", new StringFormatInfo()),
new FieldInfo("schema_name", new StringFormatInfo()))
);
return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
"sqlserver", "localhost:9092",
new JsonFormat(), null,
null, "id");
}
/**
* Build sqlserver load node.
*/
private SqlServerLoadNode buildSqlServerLoadNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()));
List<FieldRelation> relations = Arrays
.asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())));
Map<String, String> properties = new LinkedHashMap<>();
properties.put("dirty.side-output.connector", "log");
properties.put("dirty.ignore", "true");
properties.put("dirty.side-output.enable", "true");
properties.put("dirty.side-output.format", "csv");
properties.put("dirty.side-output.labels",
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_sqlserver");
return new SqlServerLoadNode(id, "sqlserver_out", fields, relations, null, null, 1,
properties, "jdbc:sqlserver://localhost:1433;databaseName=column_type_test", "SA",
"INLONG*123", "dbo", "work1", "id");
}
/**
* Build node relation.
*/
private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
return new NodeRelation(inputIds, outputIds);
}
/**
* Test extract data from mysql and load data to sqlserver.
*/
@Test
public void testSqlServerLoad() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Node mySqlExtractNode = buildMySQLExtractNode("1");
Node sqlServerLoadNode = buildSqlServerLoadNode("2");
StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(mySqlExtractNode, sqlServerLoadNode),
Collections.singletonList(buildNodeRelation(Collections.singletonList(mySqlExtractNode),
Collections.singletonList(sqlServerLoadNode))));
GroupInfo groupInf = new GroupInfo("1", Collections.singletonList(streamInfo));
FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInf);
Assert.assertTrue(parser.parse().tryExecute());
}
/**
* Test extract data from sqlserver and load data to kafka.
*/
@Test
public void testSqlServerExtract() throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Node sqlServerExtractNode = buildSqlServerExtractNode("1");
Node kafkaLoadNode = buildKafkaNode("2");
StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(sqlServerExtractNode, kafkaLoadNode),
Collections.singletonList(buildNodeRelation(Collections.singletonList(sqlServerExtractNode),
Collections.singletonList(kafkaLoadNode))));
GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
Assert.assertTrue(parser.parse().tryExecute());
}
}