DynamicCompile

动态编译插件

描述

:::tip

特别申明 您需要确保服务的安全性,并防止攻击者上传破坏性代码

:::

提供一种可编程的方式来处理行,允许用户自定义任何业务行为,甚至基于现有行字段作为参数的RPC请求,或者通过从其他数据源检索相关数据来扩展字段。为了区分业务,您还可以定义多个转换进行组合, 如果转换过于复杂,可能会影响性能

属性

nametyperequireddefault value
source_codestringno
compile_languageEnumyes
compile_patternEnumnoSOURCE_CODE
absolute_pathstringno

common options [string]

转换插件的常见参数, 请参考 Transform Plugin 了解详情。

compile_language [Enum]

Java中的某些语法可能不受支持,请参阅https://github.com/janino-compiler/janino GROOVY,JAVA

compile_pattern [Enum]

SOURCE_CODE,ABSOLUTE_PATH 选择 SOURCE_CODE,SOURCE_CODE 属性必填;选择ABSOLUTE_PATH,ABSOLUTE_PATH属性必填。

absolute_path [string]

服务器上Java或Groovy文件的绝对路径

source_code [string]

源代码

关于source_code

在代码中,你必须实现两个方法

  • Column[] getInlineOutputColumns(CatalogTable inputCatalogTable)
  • Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow)

getInlineOutputColumns方法中,入参类型为CatalogTable,返回结果为Column[]。 你可以从入参的CatalogTable获取当前表的表结构。 在返回结果中,如果字段已经存在,则会根据返回结果进行覆盖,如果不存在,则会添加到现有表结构中。

getInlineOutputFieldValues方法,入参类型为SeaTunnelRowAccessor,返回结果为Object[] 你可以从SeaTunnelRowAccessor获取到当前行的数据,进行自己的定制化数据处理逻辑。 返回结果中,数组长度需要与getInlineOutputColumns方法返回的长度一致,并且里面的字段值顺序也需要保持一致。

如果有第三方依赖包,请将它们放在${SEATUNNEL_HOME}/lib中,如果您使用spark或flink,则需要将其放在相应服务的libs下。 你需要重启集群服务,才能重新加载这些依赖。

Example

源端数据读取的表格如下:

nameagecard
Joy Ding20123
May Ding20123
Kin Dom30123
Joy Dom30123

我们将使用DynamicCompile对数据进行修改,添加一列compile_language字段,并且将age字段更新,当age=20时将其更新为40

  • 使用groovy
transform {
 DynamicCompile {
    plugin_input = "fake"
    plugin_output = "groovy_out"
    compile_language="GROOVY"
    compile_pattern="SOURCE_CODE"
    source_code="""
                 import org.apache.seatunnel.api.table.catalog.Column
                 import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
                 import org.apache.seatunnel.api.table.catalog.CatalogTable
                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
                 import org.apache.seatunnel.api.table.type.*;
                 import java.util.ArrayList;
                 class demo  {
                    public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
                        PhysicalColumn col1 =
                                PhysicalColumn.of(
                                        "compile_language",
                                        BasicType.STRING_TYPE,
                                        10L,
                                        true,
                                        "",
                                        "");
                        PhysicalColumn col2 =
                                PhysicalColumn.of(
                                        "age",
                                        BasicType.INT_TYPE,
                                        0L,
                                        false,
                                        false,
                                        ""
                                );
                        return new Column[]{
                                col1, col2
                        };
                    }
                
                
                    public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
                        Object[] fieldValues = new Object[2];
                        // get age 
                        Object ageField = inputRow.getField(1);
                        fieldValues[0] = "GROOVY";
                        if (Integer.parseInt(ageField.toString()) == 20) {
                            fieldValues[1] = 40;
                        } else {
                            fieldValues[1] = ageField;
                        }
                        return fieldValues;
                    }
                 };"""

  }
}
  • 使用java
transform {
 DynamicCompile {
    plugin_input = "fake"
    plugin_output = "java_out"
    compile_language="JAVA"
    compile_pattern="SOURCE_CODE"
    source_code="""
                 import org.apache.seatunnel.api.table.catalog.Column;
                 import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
                 import org.apache.seatunnel.api.table.catalog.*;
                 import org.apache.seatunnel.api.table.type.*;
                 import java.util.ArrayList;
                    public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) {
                        PhysicalColumn col1 =
                                PhysicalColumn.of(
                                        "compile_language",
                                        BasicType.STRING_TYPE,
                                        10L,
                                        true,
                                        "",
                                        "");
                        PhysicalColumn col2 =
                                PhysicalColumn.of(
                                        "age",
                                        BasicType.INT_TYPE,
                                        0L,
                                        false,
                                        false,
                                        ""
                                );
                        return new Column[]{
                                col1, col2
                        };
                    }
                
                
                    public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
                        Object[] fieldValues = new Object[2];
                        // get age 
                        Object ageField = inputRow.getField(1);
                        fieldValues[0] = "JAVA";
                        if (Integer.parseInt(ageField.toString()) == 20) {
                            fieldValues[1] = 40;
                        } else {
                            fieldValues[1] = ageField;
                        }
                        return fieldValues;
                    }
                """

  }
 } 
  • 指定源码文件路径
 transform {
 DynamicCompile {
    plugin_input = "fake"
    plugin_output = "groovy_out"
    compile_language="GROOVY"
    compile_pattern="ABSOLUTE_PATH"
    absolute_path="""/tmp/GroovyFile"""

  }
}

那么结果表 groovy_out 中的数据将会更新为:

nameagecardcompile_language
Joy Ding40123GROOVY
May Ding40123GROOVY
Kin Dom30123GROOVY
Joy Dom30123GROOVY

那么结果表 java_out 中的数据将会更新为:

nameagecardcompile_language
Joy Ding40123JAVA
May Ding40123JAVA
Kin Dom30123JAVA
Joy Dom30123JAVA

更多复杂例子可以参考 https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf

Changelog