import ChangeLog from ‘../changelog/connector-fake.md’;

FakeSource

FakeSource 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

描述

FakeSource 是一个虚拟数据源,它根据用户定义的 schema 数据结构随机生成指定数量的行数据,主要用于类型转换或连接器新功能测试等测试场景。

主要特性

数据源选项

名称类型必填默认值描述
tables_configslist-定义多个 FakeSource,每个项可以包含完整的 FakeSource 配置描述
schemaconfig-定义 Schema 信息
auto.increment.enabledbooleanfalse启用自动递增ID
auto.increment.startint自动递增ID的起始值
row.numint5每个并行度生成的数据总行数
split.numint1枚举器为每个并行度生成的分片数量
split.read-intervallong1读取器在两个分片读取之间的间隔时间(毫秒)
map.sizeint5连接器生成的 map 类型的大小
array.sizeint5连接器生成的 array 类型的大小
bytes.lengthint5连接器生成的 bytes 类型的长度
string.lengthint5连接器生成的 string 类型的长度
string.fake.modestringrange生成字符串数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 string.template 选项
string.templatelist-连接器生成的字符串类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
tinyint.fake.modestringrange生成 tinyint 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 tinyint.template 选项
tinyint.mintinyint0连接器生成的 tinyint 数据的最小值
tinyint.maxtinyint127连接器生成的 tinyint 数据的最大值
tinyint.templatelist-连接器生成的 tinyint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
smallint.fake.modestringrange生成 smallint 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 smallint.template 选项
smallint.minsmallint0连接器生成的 smallint 数据的最小值
smallint.maxsmallint32767连接器生成的 smallint 数据的最大值
smallint.templatelist-连接器生成的 smallint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
int.fake.templatestringrange生成 int 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 int.template 选项
int.minsmallint0连接器生成的 int 数据的最小值
int.maxsmallint0x7fffffff连接器生成的 int 数据的最大值
int.templatelist-连接器生成的 int 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
bigint.fake.modestringrange生成 bigint 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 bigint.template 选项
bigint.minbigint0连接器生成的 bigint 数据的最小值
bigint.maxbigint0x7fffffffffffffff连接器生成的 bigint 数据的最大值
bigint.templatelist-连接器生成的 bigint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
float.fake.modestringrange生成 float 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 float.template 选项
float.minfloat0连接器生成的 float 数据的最小值
float.maxfloat0x1.fffffeP+127连接器生成的 float 数据的最大值
float.templatelist-连接器生成的 float 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
double.fake.modestringrange生成 double 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 double.template 选项
double.mindouble0连接器生成的 double 数据的最小值
double.maxdouble0x1.fffffffffffffP+1023连接器生成的 double 数据的最大值
double.templatelist-连接器生成的 double 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
vector.dimensionint4生成的向量的维度,不包括二进制向量
binary.vector.dimensionint8生成的二进制向量的维度
vector.float.minfloat0连接器生成的向量中 float 数据的最小值
vector.float.maxfloat0x1.fffffeP+127连接器生成的向量中 float 数据的最大值
common-options-数据源插件通用参数,详情请参考 Source Common Options

任务示例

简单示例

此示例随机生成指定类型的数据。如果您想了解如何声明字段类型,请点击 这里

schema = {
  fields {
    c_map = "map<string, array<int>>"
    c_map_nest = "map<string, {c_int = int, c_string = string}>"
    c_array = "array<int>"
    c_string = string
    c_boolean = boolean
    c_tinyint = tinyint
    c_smallint = smallint
    c_int = int
    c_bigint = bigint
    c_float = float
    c_double = double
    c_decimal = "decimal(30, 8)"
    c_null = "null"
    c_bytes = bytes
    c_date = date
    c_timestamp = timestamp
    c_row = {
      c_map = "map<string, map<string, string>>"
      c_array = "array<int>"
      c_string = string
      c_boolean = boolean
      c_tinyint = tinyint
      c_smallint = smallint
      c_int = int
      c_bigint = bigint
      c_float = float
      c_double = double
      c_decimal = "decimal(30, 8)"
      c_null = "null"
      c_bytes = bytes
      c_date = date
      c_timestamp = timestamp
    }
  }
}

随机生成

随机生成 16 条符合类型的数据

source {
  # 这是一个示例输入插件,**仅用于测试和演示功能输入插件**
  FakeSource {
    row.num = 16
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_null = "null"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
    plugin_output = "fake"
  }
}

自定义数据内容简单示例

这是一个自定义数据源信息的示例,定义每条数据是添加还是删除修改操作,并定义每个字段存储的内容

source {
  FakeSource {
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_null = "null"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [{"a": "b"}, [101], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
      }
      {
        kind = UPDATE_BEFORE
        fields = [{"a": "c"}, [102], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
      }
      {
        kind = UPDATE_AFTER
        fields = [{"a": "e"}, [103], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
      }
      {
        kind = DELETE
        fields = [{"a": "f"}, [104], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
      }
    ]
  }
}

由于 HOCON 规范的限制,用户无法直接创建字节序列对象。FakeSource 使用字符串来分配 bytes 类型的值。在上面的示例中,bytes 类型字段被分配了 "bWlJWmo=",这是通过 base64 编码的 “miIZj”。因此,在为 bytes 类型字段赋值时,请使用 base64 编码的字符串。

指定数据数量简单示例

此案例指定生成数据的数量以及生成值的长度

FakeSource {
  row.num = 10
  map.size = 10
  array.size = 10
  bytes.length = 10
  string.length = 10
  schema = {
    fields {
      c_map = "map<string, array<int>>"
      c_array = "array<int>"
      c_string = string
      c_boolean = boolean
      c_tinyint = tinyint
      c_smallint = smallint
      c_int = int
      c_bigint = bigint
      c_float = float
      c_double = double
      c_decimal = "decimal(30, 8)"
      c_null = "null"
      c_bytes = bytes
      c_date = date
      c_timestamp = timestamp
      c_row = {
        c_map = "map<string, map<string, string>>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(30, 8)"
        c_null = "null"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
  }
}

模板数据简单示例

根据指定模板随机生成

使用模板

FakeSource {
  row.num = 5
  string.fake.mode = "template"
  string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"]
  tinyint.fake.mode = "template"
  tinyint.template = [1, 2, 3, 4, 5, 6, 7, 8, 9]
  smalling.fake.mode = "template"
  smallint.template = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
  int.fake.mode = "template"
  int.template = [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
  bigint.fake.mode = "template"
  bigint.template = [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
  float.fake.mode = "template"
  float.template = [40.0, 41.0, 42.0, 43.0]
  double.fake.mode = "template"
  double.template = [44.0, 45.0, 46.0, 47.0]
  schema {
    fields {
      c_string = string
      c_tinyint = tinyint
      c_smallint = smallint
      c_int = int
      c_bigint = bigint
      c_float = float
      c_double = double
    }
  }
}

范围数据简单示例

在指定的数据生成范围内随机生成

FakeSource {
  row.num = 5
  string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"]
  tinyint.min = 1
  tinyint.max = 9
  smallint.min = 10
  smallint.max = 19
  int.min = 20
  int.max = 29
  bigint.min = 30
  bigint.max = 39
  float.min = 40.0
  float.max = 43.0
  double.min = 44.0
  double.max = 47.0
  schema {
    fields {
      c_string = string
      c_tinyint = tinyint
      c_smallint = smallint
      c_int = int
      c_bigint = bigint
      c_float = float
      c_double = double
    }
  }
}

生成多张表

这是一个生成多数据源测试表 test.table1test.table2 的示例

FakeSource {
  tables_configs = [
    {
      row.num = 16
      schema {
        table = "test.table1"
        fields {
          c_string = string
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
        }
      }
    },
    {
      row.num = 17
      schema {
        table = "test.table2"
        fields {
          c_string = string
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
        }
      }
    }
  ]
}

rows 选项示例

rows = [
  {
    kind = INSERT
    fields = [1, "A", 100]
  },
  {
    kind = UPDATE_BEFORE
    fields = [1, "A", 100]
  },
  {
    kind = UPDATE_AFTER
    fields = [1, "A_1", 100]
  },
  {
    kind = DELETE
    fields = [1, "A_1", 100]
  }
]

table-names 选项示例

source {
  # 这是一个示例源插件,**仅用于测试和演示源插件功能**
  FakeSource {
    table-names = ["test.table1", "test.table2", "test.table3"]
    parallelism = 1
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

defaultValue 选项示例

可以通过 rowcolumns 生成自定义数据。对于时间类型,可以通过 CURRENT_TIMESTAMPCURRENT_TIMECURRENT_DATE 获取当前时间。

    schema = {
        fields {
            pk_id = bigint
            name = string
            score = int
            time1 = timestamp
            time2 = time
            time3 = date
        }
    }
    # 使用 rows
    rows = [
        {
            kind = INSERT
            fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
        }
    ]
      schema = {
          # 使用 columns
           columns = [
           {
              name = book_publication_time
              type = timestamp
              defaultValue = "2024-09-12 15:45:30"
              comment = "书籍出版时间"
           },
           {
              name = book_publication_time2
              type = timestamp
              defaultValue = CURRENT_TIMESTAMP
              comment = "书籍出版时间2"
           },
           {
              name = book_publication_time3
              type = time
              defaultValue = "15:45:30"
              comment = "书籍出版时间3"
           },
           {
              name = book_publication_time4
              type = time
              defaultValue = CURRENT_TIME
              comment = "书籍出版时间4"
           },
           {
              name = book_publication_time5
              type = date
              defaultValue = "2024-09-12"
              comment = "书籍出版时间5"
           },
           {
              name = book_publication_time6
              type = date
              defaultValue = CURRENT_DATE
              comment = "书籍出版时间6"
           }
       ]
      }

使用向量示例

source {
  FakeSource {
      row.num = 10
      # 低优先级 
      vector.dimension= 4
      binary.vector.dimension = 8
      # 低优先级 
      schema = {
           table = "simple_example"
           columns = [
           {
              name = book_id
              type = bigint
              nullable = false
              defaultValue = 0
              comment = "主键 ID"
           },
            {
              name = book_intro_1
              type = binary_vector
              columnScale =8
              comment = "向量"
           },
           {
              name = book_intro_2
              type = float16_vector
              columnScale =4
              comment = "向量"
           },
           {
              name = book_intro_3
              type = bfloat16_vector
              columnScale =4
              comment = "向量"
           },
           {
              name = book_intro_4
              type = sparse_float_vector
              columnScale =4
              comment = "向量"
           }
       ]
     }
  }
}

自增主键示例


source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { plugin_output = "fake" auto.increment.enabled = true auto.increment.start = 1000 row.num = 50000 schema = { fields { id = "int" name = "string" age = "int" } primaryKey { name = "pk" columnNames = [id] } } } }

变更日志