| --- |
| id: 'config' |
| title: 'Project Configuration' |
| sidebar_position: 2 |
| language: 'zh-CN' |
| --- |
| |
| import { |
| ClientOption, |
| ClientProperty, |
| ClientMemory, |
| ClientTotalMem, |
| ClientCheckpoints, |
| ClientBackend, |
| ClientFixedDelay, |
| ClientFailureRate, |
| ClientTables |
| } from '../components/TableData.jsx'; |
| |
| Configuration is very important in `StreamPark`. |
| |
| ## Why do I need to configure |
| |
| It takes about 4 steps to create a `DataStream` program. |
| |
| - StreamexEcutionEnvironment initial and configured |
| - Create source |
| - Create transformation |
| - Create sink |
| |
|  |
| |
| When developing 'datastream' programs, we need to initialize 'environment' and configure relevant parameters. Generally, we should initialize 'environment' and configure relevant parameters in the first step. The configuration parameters include the following categories: |
| |
| * Parallelism |
| * TimeCharacteristic |
| * checkpoint |
| * Watermark |
| * State Backend |
| * Restart Strategy |
| * Other... |
| |
| The above configurations are basically general, which is a repetitive work to be done in the first step. |
| |
| Submit the program as follows: |
| |
| ```bash |
| flink run -m yarn-cluster -p 1 -c com.xx.Main job.jar |
| ``` |
| |
| |
| You need to set a series of environment parameters when developing the `Flink Sql` program. In addition, the following is an example of using pure SQL to develop the program. |
| |
| ``` java title= Flinksql task for Java code development |
| |
| import org.apache.flink.table.api.EnvironmentSettings; |
| import org.apache.flink.table.api.Table; |
| import org.apache.flink.table.api.TableEnvironment; |
| |
| public class JavaTableApp { |
| |
| public static void main(String[] args) { |
| EnvironmentSettings bbSettings = EnvironmentSettings |
| .newInstance() |
| .useBlinkPlanner() |
| .build(); |
| |
| TableEnvironment bsTableEnv = TableEnvironment.create(bbSettings); |
| |
| String sourceDDL = "CREATE TABLE datagen ( " + |
| " f_random INT, " + |
| " f_random_str STRING, " + |
| " ts AS localtimestamp, " + |
| " WATERMARK FOR ts AS ts " + |
| ") WITH ( " + |
| " 'connector' = 'datagen', " + |
| " 'rows-per-second'='10', " + |
| " 'fields.f_random.min'='1', " + |
| " 'fields.f_random.max'='5', " + |
| " 'fields.f_random_str.length'='10' " + |
| ")"; |
| |
| bsTableEnv.executeSql(sourceDDL); |
| |
| String sinkDDL = "CREATE TABLE print_table (" + |
| " f_random int," + |
| " c_val bigint, " + |
| " wStart TIMESTAMP(3) " + |
| ") WITH ('connector' = 'print') "; |
| |
| bsTableEnv.executeSql(sinkDDL); |
| } |
| |
| } |
| |
| ``` |
| |
| In addition to setting the 'environmentsettings' parameter, most of the remaining code is spliced with SQL in Java. If the business is very complex, it will be difficult to maintain. |
| |
| A simpler method should be used, such as simplifying some environment initialization parameters and startup parameters in the 'datastream' and 'Flink SQL' tasks. For the 'Flink SQL' job, it is better not to write a single line of code, nor write large pieces of SQL in the code. Can it be solved in a more elegant way? |
| |
| **Absolutely** |
| |
| `StreamPark` proposes the concept of unified program configuration, which is generated by configuring a series of parameters from development to deployment in the `application.yml`according to a specific format a general configuration template, so that the initialization of the environment can be completed by transferring the configuration of the project to the program when the program is started. This is the concept of `configuration file`. |
| |
| `StreamPark` provides a higher level of abstraction for the `Flink SQL`, developers only need to define SQL to `sql.yaml`, when the program is started, the `sql.yaml` is transferred to the main program, and the SQL will be automatically loaded and executed. This is the concept of `sql file`. |
| |
| ## Terms |
| |
| In order to better understand and communicate with each other, we configure a series of parameters of the program from development to deployment into a file according to a specific format. This file with a specific role is the <strong> **`configuration file`** </strong> of the project. |
| |
| The SQL extracted in Flink SQL task is put into `sql.yaml`, this file with specific role is the `sql file` of the project. |
| |
| ## Configuration file |
| |
| In StreamPark, the configuration file of `DataStream` job and `Flink Sql` are common. In other words, this configuration file can define the configurations of `DataStream` and `Flink Sql` (the configuration file in Flink SQL job is optional). The format of the configuration file must be `yaml` and must meet the requirements of yaml. |
| |
| How to configure this configuration file and what to pay attention to. |
| |
| ```yaml |
| |
| flink: |
| deployment: |
| option: |
| target: application |
| detached: |
| shutdownOnAttachedExit: |
| jobmanager: |
| property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html |
| $internal.application.main: org.apache.streampark.flink.quickstart.QuickStartApp |
| pipeline.name: StreamPark QuickStart App |
| yarn.application.queue: |
| taskmanager.numberOfTaskSlots: 1 |
| parallelism.default: 2 |
| jobmanager.memory: |
| flink.size: |
| heap.size: |
| jvm-metaspace.size: |
| jvm-overhead.max: |
| off-heap.size: |
| process.size: |
| taskmanager.memory: |
| flink.size: |
| framework.heap.size: |
| framework.off-heap.size: |
| managed.size: |
| process.size: |
| task.heap.size: |
| task.off-heap.size: |
| jvm-metaspace.size: |
| jvm-overhead.max: |
| jvm-overhead.min: |
| managed.fraction: 0.4 |
| pipeline: |
| auto-watermark-interval: 200ms |
| # checkpoint |
| execution: |
| checkpointing: |
| mode: EXACTLY_ONCE |
| interval: 30s |
| timeout: 10min |
| unaligned: false |
| externalized-checkpoint-retention: RETAIN_ON_CANCELLATION |
| # state backend |
| state: |
| backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'), |
| backend.incremental: true |
| checkpoint-storage: filesystem |
| savepoints.dir: file:///tmp/chkdir |
| checkpoints.dir: file:///tmp/chkdir |
| # restart strategy |
| restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies] |
| restart-strategy.fixed-delay: |
| attempts: 3 |
| delay: 5000 |
| restart-strategy.failure-rate: |
| max-failures-per-interval: |
| failure-rate-interval: |
| delay: |
| # table |
| table: |
| planner: blink # (blink|old|any) |
| mode: streaming #(batch|streaming) |
| |
| ``` |
| The above is the complete configuration related to the environment that needs to be paid attention to. These configurations are carried out under the namespace of `Flink`, mainly including two categories. |
| * The configuration under deployment is the configuration related to the project `deployment` (`that is, the configuration parameters related to a series of resources when the application is started`). |
| * Others are the configuration of the environment that needs attention during development. |
| |
| There are some configurations related to the environment that need to be paid attention to during development. |
| |
| * `checkpoint` |
| * `watermark` |
| * `state backend` |
| * `restart-strategy` |
| * `table` |
| |
| ### Deployment |
| Deployment related parameters and configuration items are included in `deployment`, including two types: |
| * `option` |
| * `property` |
| #### option |
| |
| The parameters of Flink run are configured under `option`. Currently, the supported parameters are as follows. |
| |
| <ClientOption></ClientOption> |
| |
| `parallelism` (-p) Parallelism does not support configuration in option, you can configure it in property |
| `class` (-c) The main class of the program does not support configuration in option, you can configure it in property |
| |
| :::info Attention |
| |
| The parameter in option must be a full parameter name |
| ::: |
| |
| #### property |
| |
| The parameter under `property` is the parameter under standard parameter - D, including two parts |
| - Basic parameters |
| - Memory parameters |
| ##### Basic parameters |
| |
| There are many basic parameters. The five most basic parameters are as follows. |
| |
| <ClientProperty></ClientProperty> |
| |
| :::info Attention |
| `$internal.application.main` and `pipeline.name` must be set. |
| ::: |
| |
| If you need to set more parameters, please refer to [`here`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html), These parameters must be placed under the property and the parameter names must be correct. StreamPark will automatically resolve these parameters and take effect. |
| |
| ##### Memory parameters |
| |
| Memory has many configuration parameters. The common configurations are as follows. |
| |
| <ClientMemory></ClientMemory> |
| |
| Similarly, if you want to configure more memory parameters, please refer to [`here`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html). You need to put the memory configuration of [`Flink process memory`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup.html), [`jobmanager`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html) and [`taskmanager`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_jobmanager.html) in the property to ensure that it takes effect. |
| |
| ##### Configure Total Memory |
| The total process memory of Flink JVM processes consists of memory consumed by the Flink application (total Flink memory) and by the JVM to run the process. The total Flink memory consumption includes usage of JVM Heap and Off-heap (Direct or Native) memory. |
| |
| <center> |
| <img src="/doc/image/process_mem_model.svg" width="340px"/> |
| </center> |
| |
| The simplest way to set up memory in Flink is to configure either of the two following options: |
| |
| <ClientTotalMem></ClientTotalMem> |
| |
| |
| :::danger Attention |
| Explicitly configuring both total process memory and total Flink memory is not recommended. It may lead to deployment failures due to potential memory configuration conflicts. Configuring other memory components also requires caution as it can produce further configuration conflicts. |
| ::: |
| |
| ### Checkpoint |
| |
| The configuration of checkpoint is simple. You can configure them as follows: |
| |
| <ClientCheckpoints></ClientCheckpoints> |
| |
| ### Watermark |
| |
| For `watermark` configuration, you only need to set the generation cycle `pipeline.auto-watermark-interval` of the watermark. |
| |
| ### State |
| |
| ```yaml |
| state: |
| backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'), |
| backend.incremental: true |
| checkpoint-storage: filesystem |
| savepoints.dir: file:///tmp/chkdir |
| checkpoints.dir: file:///tmp/chkdir |
| ``` |
| There are roughly two types: |
| * backend |
| * checkpoints |
| |
| #### backend |
| |
| The backend is used to set the configuration of the state backend. The configuration of the state backend follows the configuration rules in the [`official website document`](https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/state_backends.html). The following configurations are supported: |
| |
| <ClientBackend></ClientBackend> |
| |
| If the save type of backend is rocksdb, you may need to further set the configuration of rocksdb. You can refer to the official website for configuration. It should be noted that the configuration of rocksdb on the official website is prefixed with state.backend, and the current namespace is under state.backend. Note that the parameter name must be correct |
| |
| :::info attention |
| The value item is a non-standard configuration. This item is used to set the state saving type (jobmanager | filesystem | rocksdb). Other items are standard configurations and comply with the specifications of the official website. |
| ::: |
| |
| |
| ### Restart Strategy |
| |
| There are three restart strategies in Flink, corresponding to the three configurations here, as follows: |
| ```yaml |
| restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies] |
| restart-strategy.fixed-delay: |
| attempts: 3 |
| delay: 5000 |
| restart-strategy.failure-rate: |
| max-failures-per-interval: |
| failure-rate-interval: |
| delay: |
| ``` |
| |
| Configure the specific restart strategy under `restart-strategy` |
| |
| * fixed-delay |
| * failure-rate |
| * none |
| |
| #### fixed-delay |
| <ClientFixedDelay></ClientFixedDelay> |
| |
| :::tip Example: |
| |
| ```yaml |
| attempts: 5 |
| delay: 3 s |
| ``` |
| That is to say, the maximum number of failed retries of a task is `5`, and the time interval for each task restart is `3 seconds`. If the number of failed retries reaches `5`, the task will fail and exit. |
| ::: |
| |
| #### failure-rate |
| <ClientFailureRate></ClientFailureRate> |
| |
| :::tip Example |
| |
| ```yaml |
| max-failures-per-interval: 10 |
| failure-rate-interval: 5 min |
| delay: 2 s |
| ``` |
| |
| That is, the time interval between each abnormal restart is `2 seconds`. If the total number of failures reaches `10` within `5 minutes`, the task fails. |
| ::: |
| |
| #### None |
| |
| There is no need to configure task parameters in case of no restart. |
| |
| #### Unit suffix |
| |
| Note that the time interval and frequency settings can be set without the unit suffix. If the unit suffix is not included, it will be treated as `milliseconds` by default. The optional units are: |
| |
| * s second |
| * m minute |
| * min minute |
| * h hour |
| * d day |
| |
| ### Table |
| |
| Under `table` is the configuration of Flink SQL. The currently supported configuration items and functions are as follows: |
| |
| * planner |
| * mode |
| * catalog |
| * database |
| |
| <ClientTables></ClientTables> |
| |
| ## Sql file |
| |
| The SQL file must be in yaml format, and the definition rules of yaml file must be followed. The definition of specific internal SQL format is very simple, as follows: |
| |
| ```sql |
| sql: | |
| CREATE TABLE datagen ( |
| f_sequence INT, |
| f_random INT, |
| f_random_str STRING, |
| ts AS localtimestamp, |
| WATERMARK FOR ts AS ts |
| ) WITH ( |
| 'connector' = 'datagen', |
| -- optional options -- |
| 'rows-per-second'='5', |
| 'fields.f_sequence.kind'='sequence', |
| 'fields.f_sequence.start'='1', |
| 'fields.f_sequence.end'='1000', |
| 'fields.f_random.min'='1', |
| 'fields.f_random.max'='1000', |
| 'fields.f_random_str.length'='10' |
| ); |
| |
| CREATE TABLE print_table ( |
| f_sequence INT, |
| f_random INT, |
| f_random_str STRING |
| ) WITH ( |
| 'connector' = 'print' |
| ); |
| |
| INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen; |
| ``` |
| `sql` is the ID of the current SQL and must be unique. The following contents are specific SQL. |
| |
| :::danger attention |
| |
| In the above content, | after SQL is required. In addition, | will retain the format of the whole section. StreamPark can directly define multiple SQLs at once. Each SQLs must be separated by semicolons, and each section of SQLs must follow the format and specification specified by Flink SQL. |
| ::: |
| |
| ## Summary |
| |
| This chapter introduces the specific configuration of configuration files and SQL files in detail. I believe you have a preliminary impression and concept. Please refer to the following chapters for specific use. |