import ChangeLog from ‘../changelog/connector-graphql.md’;

GraphQL

GraphQL sink 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

接收Source端传入的数据,利用数据触发 web hooks。

例如,来自上游的数据为 [label: {"__name__": "test1"}, value: 1.2.3,time:2024-08-15T17:00:00], 则body内容如下: {"label":{"__name__": "test1"}, "value":"1.23","time":"2024-08-15T17:00:00"}

Tips: GraphQL 数据接收器 仅支持 post json 类型的 web hook,source 数据将被视为 webhook 中的 body 内容。并且不支持传递过去太久的数据

支持的数据源信息

想使用 Prometheus 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖

数据源支持版本依赖
HttpuniversalDownload

接收器选项

NameTypeRequiredDefaultDescription
urlStringYes-Http request url
queryStringYes-GraphQL query
variablesStringNo-GraphQL variables
valueCoverBooleanNo-Whether the data overwrites the variable value
headersMapNo-Http headers
retryIntNo-The max retry times if request http return to IOException
retry_backoff_multiplier_msIntNo100The retry-backoff times(millis) multiplier if request http failed
retry_backoff_max_msIntNo10000The maximum retry-backoff times(millis) if request http failed
connect_timeout_msIntNo12000Connection timeout setting, default 12s.
socket_timeout_msIntNo60000Socket timeout setting, default 60s.
key_timestampIntNO-prometheus timestamp key .
key_labelStringyes-prometheus label key
key_valueDoubleyes-prometheus value
batch_sizeIntfalse1024prometheus batch size write
flush_intervalLongfalse300000Lprometheus flush commit interval
common-optionsNo-Sink plugin common parameters, please refer to Sink Common Options for details

示例

简单示例:

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    tables_configs = [
       {
        schema = {
          table = "graphql_sink_1"
         fields {
                id = int
                val_bool = boolean
                val_int8 = tinyint
                val_int16 = smallint
                val_int32 = int
                val_int64 = bigint
                val_float = float
                val_double = double
                val_decimal = "decimal(16, 1)"
                val_string = string
                val_unixtime_micros = timestamp
      }
        }
            rows = [
              {
                kind = INSERT
                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
              }
              ]
       },
       {
       schema = {
         table = "graphql_sink_2"
              fields {
                        id = int
                        val_bool = boolean
                        val_int8 = tinyint
                        val_int16 = smallint
                        val_int32 = int
                        val_int64 = bigint
                        val_float = float
                        val_double = double
                        val_decimal = "decimal(16, 1)"
                        val_string = string
                        val_unixtime_micros = timestamp
              }
       }
           rows = [
             {
               kind = INSERT
               fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
             }
             ]
      }
    ]
  }
}

sink {
   GraphQL {
        url = "http://192.168.1.103:9081/v1/graphql"
        query = """
         mutation MyMutation(
           $id: Int!
           $val_bool: Boolean!
           $val_int8: smallint!
           $val_int16: smallint!
           $val_int32: Int!
           $val_int64: bigint!
           $val_float: Float!
           $val_double: Float!
           $val_decimal: numeric!
           $val_string: String!
           $val_unixtime_micros: timestamp!
         ) {
           insert_sink(objects: {
             id: $id,
             val_bool: $val_bool,
             val_int8: $val_int8,
             val_int16: $val_int16,
             val_int32: $val_int32,
             val_int64: $val_int64,
             val_float: $val_float,
             val_double: $val_double,
             val_decimal: $val_decimal,
             val_string: $val_string,
             val_unixtime_micros: $val_unixtime_micros
           }) {
             affected_rows
             returning {
               id
               val_bool
               val_decimal
               val_double
               val_float
               val_int16
               val_int32
               val_int64
               val_int8
               val_string
               val_unixtime_micros
             }
           }
         }
        """
        variables = {
            "val_bool": True
        }
    }
}

变更日志