import ChangeLog from ‘../changelog/connector-assert.md’;

Assert

Assert sink connector

Description

A sink plugin which can assert illegal data by user defined rules

Key Features

Options

NameTypeRequiredDefault
rulesConfigMapyes-
rules.field_rulesstringyes-
rules.field_rules.field_namestring|ConfigMapyes-
rules.field_rules.field_typestringno-
rules.field_rules.field_valueConfigListno-
rules.field_rules.field_value.rule_typestringno-
rules.field_rules.field_value.rule_valuenumericno-
rules.field_rules.field_value.equals_toboolean|numeric|string|ConfigList|ConfigMapno-
rules.row_rulesstringyes-
rules.row_rules.rule_typestringno-
rules.row_rules.rule_valuestringno-
rules.catalog_table_ruleConfigMapno-
rules.catalog_table_rule.primary_key_ruleConfigMapno-
rules.catalog_table_rule.primary_key_rule.primary_key_namestringno-
rules.catalog_table_rule.primary_key_rule.primary_key_columnsConfigListno-
rules.catalog_table_rule.constraint_key_ruleConfigListno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_namestringno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_typestringno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_columnsConfigListno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_namestringno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_typestringno-
rules.catalog_table_rule.column_ruleConfigListno-
rules.catalog_table_rule.column_rule.namestringno-
rules.catalog_table_rule.column_rule.typestringno-
rules.catalog_table_rule.column_rule.column_lengthintno-
rules.catalog_table_rule.column_rule.nullablebooleanno-
rules.catalog_table_rule.column_rule.default_valuestringno-
rules.catalog_table_rule.column_rule.commentcommentno-
rules.table-namesConfigListno-
rules.tables_configsConfigListno-
rules.tables_configs.table_pathStringno-
common-optionsno-

rules [ConfigMap]

Rule definition of user's available data. Each rule represents one field validation or row num validation.

field_rules [ConfigList]

field rules for field validation

field_name [string]

field name(string)

field_type [string | ConfigMap]

Field type declarations should adhere to this guide.

field_value [ConfigList]

A list value rule define the data value validation

rule_type [string]

The following rules are supported for now

  • NOT_NULL value can't be null
  • NULL value can be null
  • MIN define the minimum value of data
  • MAX define the maximum value of data
  • MIN_LENGTH define the minimum string length of a string data
  • MAX_LENGTH define the maximum string length of a string data
  • MIN_ROW define the minimun number of rows
  • MAX_ROW define the maximum number of rows

rule_value [numeric]

The value related to rule type. When the rule_type is MIN, MAX, MIN_LENGTH, MAX_LENGTH, MIN_ROW or MAX_ROW, users need to assign a value to the rule_value.

equals_to [boolean | numeric | string | ConfigList | ConfigMap]

equals_to is used to compare whether the field value is equal to the configured expected value. You can assign values of all types to equals_to. These types are detailed here. For instance, if one field is a row with three fields, and the declaration of row type is {a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = string}}, users can assign the value [["a", "b"], { k0 = 9999.99, k1 = 111.11 }, [123, "abcd"]] to equals_to.

The way of defining field values is consistent with FakeSource.

equals_to cannot be applied to null type fields. However, users can use the rule type NULL for verification, such as {rule_type = NULL}.

catalog_table_rule [ConfigMap]

Used to assert the catalog table is same with the user defined table.

table-names [ConfigList]

Used to assert the table should be in the data.

tables_configs [ConfigList]

Used to assert the multiple tables should be in the data.

table_path [String]

The path of the table.

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Example

Simple

the whole config obey with hocon style

Assert {
    rules =
      {
        row_rules = [
          {
            rule_type = MAX_ROW
            rule_value = 10
          },
          {
            rule_type = MIN_ROW
            rule_value = 5
          }
        ],
        field_rules = [{
          field_name = name
          field_type = string
          field_value = [
            {
              rule_type = NOT_NULL
            },
            {
              rule_type = MIN_LENGTH
              rule_value = 5
            },
            {
              rule_type = MAX_LENGTH
              rule_value = 10
            }
          ]
        }, {
          field_name = age
          field_type = int
          field_value = [
            {
              rule_type = NOT_NULL
              equals_to = 23
            },
            {
              rule_type = MIN
              rule_value = 32767
            },
            {
              rule_type = MAX
              rule_value = 2147483647
            }
          ]
        }
        ]
        catalog_table_rule {
            primary_key_rule = {
                primary_key_name = "primary key"
                primary_key_columns = ["id"]
            }
            constraint_key_rule = [
                        {
                        constraint_key_name = "unique_name"
                        constraint_key_type = UNIQUE_KEY
                        constraint_key_columns = [
                            {
                                constraint_key_column_name = "id"
                                constraint_key_sort_type = ASC
                            }
                        ]
                        }
            ]
            column_rule = [
               {
                name = "id"
                type = bigint
               },
              {
                name = "name"
                type = string
              },
              {
                name = "age"
                type = int
              }
            ]
        }
      }

  }

Complex

Here is a more complex example about equals_to. The example involves FakeSource. You may want to learn it, please read this document.

source {
  FakeSource {
    row.num = 1
    schema = {
      fields {
        c_null = "null"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_date = date
        c_timestamp = timestamp
        c_time = time
        c_bytes = bytes
        c_array = "array<int>"
        c_map = "map<time, string>"
        c_map_nest = "map<string, {c_int = int, c_string = string}>"
        c_row = {
          c_null = "null"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_decimal = "decimal(30, 8)"
          c_date = date
          c_timestamp = timestamp
          c_time = time
          c_bytes = bytes
          c_array = "array<int>"
          c_map = "map<string, string>"
        }
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [
          null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
          "bWlJWmo=",
          [0, 1, 2],
          "{ 12:01:26 = v0 }",
          { k1 = [123, "BBB-BB"]},
          [
            null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
            "bWlJWmo=",
            [0, 1, 2],
            { k0 = v0 }
          ]
        ]
      }
    ]
    plugin_output = "fake"
  }
}

sink{
  Assert {
    plugin_input = "fake"
    rules =
      {
        row_rules = [
          {
            rule_type = MAX_ROW
            rule_value = 1
          },
          {
            rule_type = MIN_ROW
            rule_value = 1
          }
        ],
        field_rules = [
            {
                field_name = c_null
                field_type = "null"
                field_value = [
                    {
                        rule_type = NULL
                    }
                ]
            },
            {
                field_name = c_string
                field_type = string
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "AAA"
                    }
                ]
            },
            {
                field_name = c_boolean
                field_type = boolean
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = false
                    }
                ]
            },
            {
                field_name = c_tinyint
                field_type = tinyint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 1
                    }
                ]
            },
            {
                field_name = c_smallint
                field_type = smallint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 1
                    }
                ]
            },
            {
                field_name = c_int
                field_type = int
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 333
                    }
                ]
            },
            {
                field_name = c_bigint
                field_type = bigint
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 323232
                    }
                ]
            },
            {
                field_name = c_float
                field_type = float
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 3.1
                    }
                ]
            },
            {
                field_name = c_double
                field_type = double
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 9.33333
                    }
                ]
            },
            {
                field_name = c_decimal
                field_type = "decimal(30, 8)"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = 99999.99999999
                    }
                ]
            },
            {
                field_name = c_date
                field_type = date
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "2012-12-21"
                    }
                ]
            },
            {
                field_name = c_timestamp
                field_type = timestamp
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "2012-12-21T12:34:56"
                    }
                ]
            },
            {
                field_name = c_time
                field_type = time
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "12:34:56"
                    }
                ]
            },
            {
                field_name = c_bytes
                field_type = bytes
                field_value = [
                      {
                          rule_type = NOT_NULL
                          equals_to = "bWlJWmo="
                      }
                ]
            },
            {
                field_name = c_array
                field_type = "array<int>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = [0, 1, 2]
                    }
                ]
            },
            {
                field_name = c_map
                field_type = "map<time, string>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = "{ 12:01:26 = v0 }"
                    }
                ]
            },
            {
                field_name = c_map_nest
                field_type = "map<string, {c_int = int, c_string = string}>"
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = { k1 = [123, "BBB-BB"] }
                    }
                ]
            },
            {
                field_name = c_row
                field_type = {
                    c_null = "null"
                    c_string = string
                    c_boolean = boolean
                    c_tinyint = tinyint
                    c_smallint = smallint
                    c_int = int
                    c_bigint = bigint
                    c_float = float
                    c_double = double
                    c_decimal = "decimal(30, 8)"
                    c_date = date
                    c_timestamp = timestamp
                    c_time = time
                    c_bytes = bytes
                    c_array = "array<int>"
                    c_map = "map<string, string>"
                }
                field_value = [
                    {
                        rule_type = NOT_NULL
                        equals_to = [
                           null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
                           "bWlJWmo=",
                           [0, 1, 2],
                           { k0 = v0 }
                        ]
                    }
                ]
            }
        ]
    }
  }
}

Assert Multiple Tables

check multiple tables

env {
  parallelism = 1
  job.mode = BATCH
}

source {
  FakeSource {
    tables_configs = [
      {
        row.num = 16
        schema {
          table = "test.table1"
          fields {
            c_int = int
            c_bigint = bigint
          }
        }
      },
      {
        row.num = 17
        schema {
          table = "test.table2"
          fields {
            c_string = string
            c_tinyint = tinyint
          }
        }
      }
    ]
  }
}

transform {
}

sink {
  Assert {
    rules =
      {
        tables_configs = [
          {
            table_path = "test.table1"
            row_rules = [
              {
                rule_type = MAX_ROW
                rule_value = 16
              },
              {
                rule_type = MIN_ROW
                rule_value = 16
              }
            ],
            field_rules = [{
              field_name = c_int
              field_type = int
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }, {
              field_name = c_bigint
              field_type = bigint
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }]
          },
          {
            table_path = "test.table2"
            row_rules = [
              {
                rule_type = MAX_ROW
                rule_value = 17
              },
              {
                rule_type = MIN_ROW
                rule_value = 17
              }
            ],
            field_rules = [{
              field_name = c_string
              field_type = string
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }, {
              field_name = c_tinyint
              field_type = tinyint
              field_value = [
                {
                  rule_type = NOT_NULL
                }
              ]
            }]
          }
        ]

      }
  }
}

Changelog