With the increasing number of users and developers of Apache InLong(incubating), the demand for richer usage scenarios and low-cost operation is getting stronger and stronger. Among them, the demand for adding Transform (T) to the whole link of InLong has received the most feedback. After the research and design of @yunqingmoswu, @EMsnap, @gong, @thexiay community developers, the InLong Sort ETL solution based on Flink SQL has been completed. This article will introduce the implementation details of the solution in detail.
Firstly, based on Apache Flink SQL, there are mainly the following considerations:
Note: For all codes of this scheme, please refer to Apache InLong Sort, which can be downloaded and used in the upcoming version 1.2.0.
The main requirements of this solution are the completed inlong sort module transform (T) capability, including:
Transform | Notes |
---|---|
Deduplication in the window | Deduplicate data within a time window |
time window aggregation | Aggregate data within a time window |
time format conversion | Converts the value of a field to a string in the target time format |
field segmentation | Split a field into multiple new fields by a delimiter |
string replacement | Replace some or all of the contents of a string field |
Data filtering | Discard or retain data that meets the filter conditions |
Content extraction | Extract part of a field to create a new field |
Join | Support two table join |
Value substitution | Given a matching value, if the field's value is equal to that value, replace it with the target value |
Users of big data integration have transform requirements such as data transformation, connection and filtering in many business scenarios.
This design needs to achieve the following goals:
The core concept refers to the explanation of terms in the outline design
Name | Meaning |
---|---|
InLong Dashboard | Inlong front end management interface |
InLong Manager Client | Wrap the interface in the manager for external user programs to call without going through the front-end inlong dashboard |
InLong Manager Openapi | Inlong manager and external system call interface |
InLong Manager metaData | Inlong manager metadata management, including metadata information of group and stream dimensions |
InLong Manager task manager | Inlong manager manages the data source collection task module, manages agent task distribution, instruction distribution, and heartbeat reporting |
InLong Group | Data flow group, including multiple data flows, one group represents one data access |
InLong Stream | Data flow: a data flow has a specific flow direction |
Stream Source | There are corresponding acquisition end and sink end in the stream. This design only involves the stream source |
Stream Info | Abstract of data flow in sort, including various sources, transformations, destinations, etc. of the data flow |
Group Info | Encapsulation of data flow in sort. A group info can contain multiple stream infos |
Node | Abstraction of data source, data transformation and data destination in data synchronization |
Extract Node | Source side abstraction of data synchronization |
Load Node | Destination abstraction of data synchronization |
MySQL Extract Node | MySQL data source abstraction |
Kafka Load Node | Kafka data destination abstraction |
Transform Node | Transformation process abstraction of data synchronization |
Aggregate Transform Node | Data synchronization aggregation class transformation process abstraction |
Node Relation | Relationship abstraction of nodes in data synchronization |
Field Relation | Abstraction of the relationship between upstream and downstream node fields in data synchronization |
Function | Abstraction of the relationship between upstream and downstream node fields in data synchronization |
Substring Function | Abstraction of string interception function |
Filter Function | Abstraction of data filter function |
Function Param | Input parameter abstraction of function |
Constant Param | Constant parameters |
Field Info | Node field |
Meta FieldInfo | Node meta information field |
This design mainly involves the following entities:
Group, Stream, GroupInfo, StreamInfo, Node, NodeRelation, FieldRelation, Function, FilterFunction, SubstringFunction, FunctionParam, FieldInfo, MetaFieldInfo, MySQLExtractNode, KafkaLoadNode, etc.
For ease of understanding, this section will model and analyze the relationship between entities. Description of entity correspondence of domain model:
The above relationship can be represented by UML object relationship diagram as:
This design only adds Flink connector and Flink SQL generator to the original system, and modifies the data model module.
Description of important module division:
Name | Description |
---|---|
FlinkSQLParser | Used to generate Flink SQL core classes, including references to GroupInfo |
GroupInfo | The internal abstraction of sort for inlong group is used to encapsulate the synchronization related information of the entire inlong group, including the reference to list<StreamInfo> |
StreamInfo | The internal abstraction of sort to inlong stream is used to encapsulate inlong stream synchronization related information, including references to list<node>, list<NodeRelation> |
Node | The top-level interface of the synchronization node. Its subclass implementation is mainly used to encapsulate the data of the synchronization data source and the transformation node |
ExtractNode | Data extract node abstraction, inherited from node |
LoadNode | Data load node abstraction, inherited from node |
TransformNode | Data transformation node abstraction, inherited from node |
NodeRelation | Define relationships between nodes |
FieldRelation | Define field relationships between nodes |
Function | Abstract of T-ability execution function |
FilterFunction | Function abstraction for data filtering, inherited from function |
SubstringFunction | Used for string interception function abstraction, inherited from function |
FunctionParam | Abstraction for function parameters |
ConstantParam | Encapsulation of function constant parameters, inherited from FunctionParam |
FieldInfo | The encapsulation of node fields can also be used as function input parameters, inherited from FunctionParam |
MetaFieldInfo | The encapsulation of built-in fields is currently mainly used in the metadata field scenario of canal JSON, which is inherited from FieldInfo |
The following describes the principle of SQL generation by taking MySQL synchronizing data to Kafka as an example
The node configuration is:
private Node buildMySQLExtractNode() { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo())); return new MySqlExtractNode("1", "mysql_input", fields, null, null, "id", Collections.singletonList("tableName"), "localhost", "root", "password", "inlong", null, null, null, null); }
The generated SQL is:
CREATE TABLE `mysql_1` (`name` string,`age` int) with ('connector' = 'mysql-cdc-inlong', 'hostname' = 'localhost', 'username' = 'root', 'password' = 'password', 'database-name' = 'inlong', 'table-name' = 'tableName')
The node configuration is:
List<FilterFunction> filters = Arrays.asList( new SingleValueFilterFunction(EmptyOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), LessThanOperator.getInstance(), new ConstantParam(25)), new SingleValueFilterFunction(AndOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), MoreThanOrEqualOperator.getInstance(), new ConstantParam(18)) );
The generated SQL is:
SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18
The node configuration is:
private Node buildKafkaLoadNode(FilterStrategy filterStrategy) { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()) ); List<FieldRelation> relations = Arrays .asList( new FieldRelation(new FieldInfo("name", new StringFormatInfo()), new FieldInfo("name", new StringFormatInfo())), new FieldRelation(new FieldInfo("age", new IntFormatInfo()), new FieldInfo("age", new IntFormatInfo())) ); List<FilterFunction> filters = Arrays.asList( new SingleValueFilterFunction(EmptyOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), LessThanOperator.getInstance(), new ConstantParam(25)), new SingleValueFilterFunction(AndOperator.getInstance(), new FieldInfo("age", new IntFormatInfo()), MoreThanOrEqualOperator.getInstance(), new ConstantParam(18)) ); return new KafkaLoadNode("2", "kafka_output", fields, relations, filters, filterStrategy, "topic1", "localhost:9092", new CanalJsonFormat(), null, null, "id"); }
The generated SQL is:
CREATE TABLE `kafka_3` (`name` string,`age` int) with ( 'connector' = 'kafka-inlong', 'topic' = 'topic1', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'canal-json-inlong', 'canal-json-inlong.ignore-parse-errors' = 'true', 'canal-json-inlong.map-null-key.mode' = 'DROP', 'canal-json-inlong.encode.decimal-as-plain-number' = 'true', 'canal-json-inlong.timestamp-format.standard' = 'SQL', 'canal-json-inlong.map-null-key.literal' = 'null' )
See 4.1 node configuration for relevant configurations
The generated SQL is:
INSERT INTO `kafka_3` SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18
The complete configuration of GroupInfo is as follows:
private Node buildMySqlExtractNode() { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo())); WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()), new StringConstantParam("1"), new TimeUnitConstantParam(TimeUnit.MINUTE)); return new MySqlExtractNode("1", "mysql_input", fields, wk, null, "id", Collections.singletonList("tableName"), "localhost", "root", "password", "inlong", null, null, null, null); } private Node buildKafkaNode() { List<FieldInfo> fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo())); List<FieldRelation> relations = Arrays .asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()), new FieldInfo("name", new StringFormatInfo())), new FieldRelation(new FieldInfo("age", new IntFormatInfo()), new FieldInfo("age", new IntFormatInfo())) ); return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null, "topic", "localhost:9092", new JsonFormat(), 1, null, "id"); } private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) { List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList()); List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList()); return new NodeRelation(inputIds, outputIds); } @Override public GroupInfo getTestObject() { Node input = buildMySqlExtractNode(); Node output = buildKafkaNode(); StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList( buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output)))); return new GroupInfo("1", Collections.singletonList(streamInfo)); }