blob: ec3e470fbc9058c7c520f6a51f152931dcb4d074 [file] [view]
---
id: 'config'
title: 'Project Configuration'
sidebar_position: 2
language: 'zh-CN'
---
import {
ClientOption,
ClientProperty,
ClientMemory,
ClientTotalMem,
ClientCheckpoints,
ClientBackend,
ClientFixedDelay,
ClientFailureRate,
ClientTables
} from '../components/TableData.jsx';
Configuration is very important in `StreamPark`.
## Why do I need to configure
It takes about 4 steps to create a `DataStream` program.
- StreamexEcutionEnvironment initial and configured
- Create source
- Create transformation
- Create sink
![](/doc/image/process_steps.png)
When developing 'datastream' programs, we need to initialize 'environment' and configure relevant parameters. Generally, we should initialize 'environment' and configure relevant parameters in the first step. The configuration parameters include the following categories:
* Parallelism
* TimeCharacteristic
* checkpoint
* Watermark
* State Backend
* Restart Strategy
* Other...
The above configurations are basically general, which is a repetitive work to be done in the first step.
Submit the program as follows:
```bash
flink run -m yarn-cluster -p 1 -c com.xx.Main job.jar
```
You need to set a series of environment parameters when developing the `Flink Sql` program. In addition, the following is an example of using pure SQL to develop the program.
``` java title= Flinksql task for Java code development
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
public class JavaTableApp {
public static void main(String[] args) {
EnvironmentSettings bbSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();
TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings);
String sourceDDL = "CREATE TABLE datagen ( " +
" f_random INT, " +
" f_random_str STRING, " +
" ts AS localtimestamp, " +
" WATERMARK FOR ts AS ts " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='10', " +
" 'fields.f_random.min'='1', " +
" 'fields.f_random.max'='5', " +
" 'fields.f_random_str.length'='10' " +
")";
bsTableEnv.executeSql(sourceDDL);
String sinkDDL = "CREATE TABLE print_table (" +
" f_random int," +
" c_val bigint, " +
" wStart TIMESTAMP(3) " +
") WITH ('connector' = 'print') ";
bsTableEnv.executeSql(sinkDDL);
}
}
```
In addition to setting the 'environmentsettings' parameter, most of the remaining code is spliced with SQL in Java. If the business is very complex, it will be difficult to maintain.
A simpler method should be used, such as simplifying some environment initialization parameters and startup parameters in the 'datastream' and 'Flink SQL' tasks. For the 'Flink SQL' job, it is better not to write a single line of code, nor write large pieces of SQL in the code. Can it be solved in a more elegant way?
**Absolutely**
`StreamPark` proposes the concept of unified program configuration, which is generated by configuring a series of parameters from development to deployment in the `application.yml`according to a specific format a general configuration template, so that the initialization of the environment can be completed by transferring the configuration of the project to the program when the program is started. This is the concept of `configuration file`.
`StreamPark` provides a higher level of abstraction for the `Flink SQL`, developers only need to define SQL to `sql.yaml`, when the program is started, the `sql.yaml` is transferred to the main program, and the SQL will be automatically loaded and executed. This is the concept of `sql file`.
## Terms
In order to better understand and communicate with each other, we configure a series of parameters of the program from development to deployment into a file according to a specific format. This file with a specific role is the <strong> **`configuration file`** </strong> of the project.
The SQL extracted in Flink SQL task is put into `sql.yaml`, this file with specific role is the `sql file` of the project.
## Configuration file
In StreamPark, the configuration file of `DataStream` job and `Flink Sql` are common. In other words, this configuration file can define the configurations of `DataStream` and `Flink Sql` (the configuration file in Flink SQL job is optional). The format of the configuration file must be `yaml` and must meet the requirements of yaml.
How to configure this configuration file and what to pay attention to.
```yaml
flink:
deployment:
option:
target: application
detached:
shutdownOnAttachedExit:
jobmanager:
property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
$internal.application.main: org.apache.streampark.flink.quickstart.QuickStartApp
pipeline.name: StreamPark QuickStart App
yarn.application.queue:
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
jobmanager.memory:
flink.size:
heap.size:
jvm-metaspace.size:
jvm-overhead.max:
off-heap.size:
process.size:
taskmanager.memory:
flink.size:
framework.heap.size:
framework.off-heap.size:
managed.size:
process.size:
task.heap.size:
task.off-heap.size:
jvm-metaspace.size:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
pipeline:
auto-watermark-interval: 200ms
# checkpoint
execution:
checkpointing:
mode: EXACTLY_ONCE
interval: 30s
timeout: 10min
unaligned: false
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# state backend
state:
backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
backend.incremental: true
checkpoint-storage: filesystem
savepoints.dir: file:///tmp/chkdir
checkpoints.dir: file:///tmp/chkdir
# restart strategy
restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
restart-strategy.fixed-delay:
attempts: 3
delay: 5000
restart-strategy.failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
# table
table:
planner: blink # (blink|old|any)
mode: streaming #(batch|streaming)
```
The above is the complete configuration related to the environment that needs to be paid attention to. These configurations are carried out under the namespace of `Flink`, mainly including two categories.
* The configuration under deployment is the configuration related to the project `deployment` (`that is, the configuration parameters related to a series of resources when the application is started`).
* Others are the configuration of the environment that needs attention during development.
There are some configurations related to the environment that need to be paid attention to during development.
* `checkpoint`
* `watermark`
* `state backend`
* `restart-strategy`
* `table`
### Deployment
Deployment related parameters and configuration items are included in `deployment`, including two types:
* `option`
* `property`
#### option
The parameters of Flink run are configured under `option`. Currently, the supported parameters are as follows.
<ClientOption></ClientOption>
`parallelism` (-p) Parallelism does not support configuration in option, you can configure it in property
`class` (-c) The main class of the program does not support configuration in option, you can configure it in property
:::info Attention
The parameter in option must be a full parameter name
:::
#### property
The parameter under `property` is the parameter under standard parameter - D, including two parts
- Basic parameters
- Memory parameters
##### Basic parameters
There are many basic parameters. The five most basic parameters are as follows.
<ClientProperty></ClientProperty>
:::info Attention
`$internal.application.main` and `pipeline.name` must be set.
:::
If you need to set more parameters, please refer to [`here`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html), These parameters must be placed under the property and the parameter names must be correct. StreamPark will automatically resolve these parameters and take effect.
##### Memory parameters
Memory has many configuration parameters. The common configurations are as follows.
<ClientMemory></ClientMemory>
Similarly, if you want to configure more memory parameters, please refer to [`here`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html). You need to put the memory configuration of [`Flink process memory`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html), [`jobmanager`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html) and [`taskmanager`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_jobmanager.html) in the property to ensure that it takes effect.
##### Configure Total Memory
The total process memory of Flink JVM processes consists of memory consumed by the Flink application (total Flink memory) and by the JVM to run the process. The total Flink memory consumption includes usage of JVM Heap and Off-heap (Direct or Native) memory.
<center>
<img src="/doc/image/process_mem_model.svg" width="340px"/>
</center>
The simplest way to set up memory in Flink is to configure either of the two following options:
<ClientTotalMem></ClientTotalMem>
:::danger Attention
Explicitly configuring both total process memory and total Flink memory is not recommended. It may lead to deployment failures due to potential memory configuration conflicts. Configuring other memory components also requires caution as it can produce further configuration conflicts.
:::
### Checkpoint
The configuration of checkpoint is simple. You can configure them as follows:
<ClientCheckpoints></ClientCheckpoints>
### Watermark
For `watermark` configuration, you only need to set the generation cycle `pipeline.auto-watermark-interval` of the watermark.
### State
```yaml
state:
backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
backend.incremental: true
checkpoint-storage: filesystem
savepoints.dir: file:///tmp/chkdir
checkpoints.dir: file:///tmp/chkdir
```
There are roughly two types:
* backend
* checkpoints
#### backend
The backend is used to set the configuration of the state backend. The configuration of the state backend follows the configuration rules in the [`official website document`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html). The following configurations are supported:
<ClientBackend></ClientBackend>
If the save type of backend is rocksdb, you may need to further set the configuration of rocksdb. You can refer to the official website for configuration. It should be noted that the configuration of rocksdb on the official website is prefixed with state.backend, and the current namespace is under state.backend. Note that the parameter name must be correct
:::info attention
The value item is a non-standard configuration. This item is used to set the state saving type (jobmanager | filesystem | rocksdb). Other items are standard configurations and comply with the specifications of the official website.
:::
### Restart Strategy
There are three restart strategies in Flink, corresponding to the three configurations here, as follows:
```yaml
restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
restart-strategy.fixed-delay:
attempts: 3
delay: 5000
restart-strategy.failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
```
Configure the specific restart strategy under `restart-strategy`
* fixed-delay
* failure-rate
* none
#### fixed-delay
<ClientFixedDelay></ClientFixedDelay>
:::tip Example:
```yaml
attempts: 5
delay: 3 s
```
That is to say, the maximum number of failed retries of a task is `5`, and the time interval for each task restart is `3 seconds`. If the number of failed retries reaches `5`, the task will fail and exit.
:::
#### failure-rate
<ClientFailureRate></ClientFailureRate>
:::tip Example
```yaml
max-failures-per-interval: 10
failure-rate-interval: 5 min
delay: 2 s
```
That is, the time interval between each abnormal restart is `2 seconds`. If the total number of failures reaches `10` within `5 minutes`, the task fails.
:::
#### None
There is no need to configure task parameters in case of no restart.
#### Unit suffix
Note that the time interval and frequency settings can be set without the unit suffix. If the unit suffix is not included, it will be treated as `milliseconds` by default. The optional units are:
* s second
* m minute
* min minute
* h hour
* d day
### Table
Under `table` is the configuration of Flink SQL. The currently supported configuration items and functions are as follows:
* planner
* mode
* catalog
* database
<ClientTables></ClientTables>
## Sql file
The SQL file must be in yaml format, and the definition rules of yaml file must be followed. The definition of specific internal SQL format is very simple, as follows:
```sql
sql: |
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);
CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
```
`sql` is the ID of the current SQL and must be unique. The following contents are specific SQL.
:::danger attention
In the above content, | after SQL is required. In addition, | will retain the format of the whole section. StreamPark can directly define multiple SQLs at once. Each SQLs must be separated by semicolons, and each section of SQLs must follow the format and specification specified by Flink SQL.
:::
## Summary
This chapter introduces the specific configuration of configuration files and SQL files in detail. I believe you have a preliminary impression and concept. Please refer to the following chapters for specific use.