Data validation transform plugin
The DataValidator transform validates field values according to configured rules and handles validation failures based on the specified error handling strategy. It supports multiple validation rule types including null checks, range validation, length validation, and regex pattern matching.
name | type | required | default value |
---|---|---|---|
row_error_handle_way | enum | no | FAIL |
row_error_handle_way.error_table | string | no | |
field_rules | array | yes |
Error handling strategy when validation fails:
FAIL
: Fail the entire task when validation errors occurSKIP
: Skip invalid rows and continue processingROUTE_TO_TABLE
: Route invalid data to a specified error tableNote: ROUTE_TO_TABLE
mode only works with sinks that support multiple tables. The sink must be capable of handling data routed to different table destinations.
Target table name for routing invalid data when row_error_handle_way
is set to ROUTE_TO_TABLE
. This parameter is required when using ROUTE_TO_TABLE
mode.
When using ROUTE_TO_TABLE
mode, DataValidator automatically creates an error table with a fixed schema to store validation failure data. The error table contains the following fields:
Field Name | Data Type | Description |
---|---|---|
source_table_id | STRING | Source table identifier that identifies the originating table |
source_table_path | STRING | Source table path with complete table path information |
original_data | STRING | JSON representation of the original data containing the complete row that failed validation |
validation_errors | STRING | JSON array of validation error details containing all failed fields and error information |
create_time | TIMESTAMP | Creation time of the validation error |
Complete Error Table Record Example:
{ "source_table_id": "users_table", "source_table_path": "database.users", "original_data": "{\"id\": 123, \"name\": null, \"age\": 200, \"email\": \"invalid-email\"}", "validation_errors": "[{\"field_name\": \"name\", \"error_message\": \"Field 'name' cannot be null\"}, {\"field_name\": \"age\", \"error_message\": \"Field 'age' value 200 is not within range [0, 150]\"}, {\"field_name\": \"email\", \"error_message\": \"Field 'email' does not match pattern '^[\\\\w-\\\\.]+@([\\\\w-]+\\\\.)+[\\\\w-]{2,4}$'\"}]", "create_time": "2024-01-15T10:30:45" }
Data Routing Mechanism:
Array of field validation rules. Each rule defines validation criteria for a specific field.
Each field rule contains:
field_name
: Name of the field to validaterules
: Array of validation rules to apply (nested format), or individual rule properties (flat format)Validates that a field value is not null.
Parameters:
rule_type
: “NOT_NULL”custom_message
(optional): Custom error messageValidates that a numeric value is within a specified range.
Parameters:
rule_type
: “RANGE”min_value
(optional): Minimum allowed valuemax_value
(optional): Maximum allowed valuemin_inclusive
(optional): Whether minimum value is inclusive (default: true)max_inclusive
(optional): Whether maximum value is inclusive (default: true)custom_message
(optional): Custom error messageValidates the length of string, array, or collection values.
Parameters:
rule_type
: “LENGTH”min_length
(optional): Minimum allowed lengthmax_length
(optional): Maximum allowed lengthexact_length
(optional): Exact required lengthcustom_message
(optional): Custom error messageValidates that a string value matches a regular expression pattern.
Parameters:
rule_type
: “REGEX”pattern
: Regular expression pattern (required)case_sensitive
(optional): Whether pattern matching is case sensitive (default: true)custom_message
(optional): Custom error messageValidates field values using custom business logic implemented as a User Defined Function.
Parameters:
rule_type
: “UDF”function_name
: Name of the UDF function to execute (required)custom_message
(optional): Custom error messageBuilt-in UDF Functions:
EMAIL
: Validates email addresses using practical validation rules based on OWASP recommendationsCreating Custom UDF Functions: To create a custom UDF function:
DataValidatorUDF
interface@AutoService(DataValidatorUDF.class)
annotationfunctionName()
validate()
method with your custom logicTransform plugin common parameters, please refer to Transform Plugin for details
transform { DataValidator { plugin_input = "source_table" plugin_output = "validated_table" row_error_handle_way = "FAIL" field_rules = [ { field_name = "name" rule_type = "NOT_NULL" }, { field_name = "age" rule_type = "RANGE" min_value = 0 max_value = 150 }, { field_name = "email" rule_type = "REGEX" pattern = "^[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}$" } ] } }
transform { DataValidator { plugin_input = "source_table" plugin_output = "validated_table" row_error_handle_way = "SKIP" field_rules = [ { field_name = "name" rule_type = "NOT_NULL" }, { field_name = "name" rule_type = "LENGTH" min_length = 2 max_length = 50 } ] } }
transform { DataValidator { plugin_input = "source_table" plugin_output = "validated_table" row_error_handle_way = "ROUTE_TO_TABLE" row_error_handle_way.error_table = "error_data" field_rules = [ { field_name = "name" rule_type = "NOT_NULL" }, { field_name = "age" rule_type = "RANGE" min_value = 0 max_value = 150 } ] } }
Note: When using ROUTE_TO_TABLE
, ensure your sink connector supports multiple tables. Valid data will be sent to the main output table, while invalid data will be routed to the specified error table.
In this example:
transform { DataValidator { plugin_input = "source_table" plugin_output = "validated_table" row_error_handle_way = "FAIL" field_rules = [ { field_name = "name" rules = [ { rule_type = "NOT_NULL" custom_message = "Name is required" }, { rule_type = "LENGTH" min_length = 2 max_length = 50 custom_message = "Name must be between 2 and 50 characters" } ] } ] } }
transform { DataValidator { plugin_input = "source_table" plugin_output = "validated_table" row_error_handle_way = "FAIL" field_rules = [ { field_name = "email" rule_type = "UDF" function_name = "EMAIL" custom_message = "Invalid email address format" } ] } }
To create a custom validation UDF function, follow these steps:
package com.example.validator; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.transform.validator.ValidationContext; import org.apache.seatunnel.transform.validator.ValidationResult; import org.apache.seatunnel.transform.validator.udf.DataValidatorUDF; import com.google.auto.service.AutoService; @AutoService(DataValidatorUDF.class) public class PhoneValidator implements DataValidatorUDF { @Override public String functionName() { return "PHONE_VALIDATOR"; } @Override public ValidationResult validate( Object value, SeaTunnelDataType<?> dataType, ValidationContext context) { if (value == null) { return ValidationResult.success(); } String phone = value.toString().trim(); // Custom phone validation logic if (phone.matches("^\\+?[1-9]\\d{1,14}$")) { return ValidationResult.success(); } else { return ValidationResult.failure("Invalid phone number format: " + phone); } } @Override public String getDescription() { return "Validates international phone number format"; } }
The UDF is automatically registered using the @AutoService(DataValidatorUDF.class)
annotation. This uses Java's ServiceLoader mechanism to discover and load UDF implementations at runtime.
Usage Example:
{ field_name = "email" rule_type = "UDF" function_name = "EMAIL" custom_message = "Please provide a valid email address" }