Flink IoTDB 连接器

IoTDB 与 Apache Flink 的集成。此模块包含了 iotdb sink,允许 flink job 将时序数据写入 IoTDB。

IoTDBSink

使用 IoTDBSink ,您需要定义一个 IoTDBOptions 和一个 IoTSerializationSchema 实例。 IoTDBSink 默认每次发送一个数据,可以通过调用 withBatchSize(int) 进行调整。

示例

该示例演示了如下从一个 Flink job 中发送数据到 IoTDB server 的场景:

  • 一个模拟的 Source SensorSource 每秒钟产生一个数据点。

  • Flink 使用 IoTDBSink 消费产生的数据并写入 IoTDB 。

    import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
    import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
    import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
    
    import com.google.common.collect.Lists;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    
    import java.security.SecureRandom;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    public class FlinkIoTDBSink {
      public static void main(String[] args) throws Exception {
        // run the flink job on local mini cluster
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        IoTDBOptions options = new IoTDBOptions();
        options.setHost("127.0.0.1");
        options.setPort(6667);
        options.setUser("root");
        options.setPassword("root");
        options.setStorageGroup("root.sg");
    
        // If the server enables auto_create_schema, then we do not need to register all timeseries
        // here.
        options.setTimeseriesOptionList(
            Lists.newArrayList(
                new IoTDBOptions.TimeseriesOption(
                    "root.sg.d1.s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY)));
    
        IoTSerializationSchema serializationSchema = new DefaultIoTSerializationSchema();
        IoTDBSink ioTDBSink =
            new IoTDBSink(options, serializationSchema)
                // enable batching
                .withBatchSize(10)
                // how many connectons to the server will be created for each parallelism
                .withSessionPoolSize(3);
    
        env.addSource(new SensorSource())
            .name("sensor-source")
            .setParallelism(1)
            .addSink(ioTDBSink)
            .name("iotdb-sink");
    
        env.execute("iotdb-flink-example");
      }
    
      private static class SensorSource implements SourceFunction<Map<String, String>> {
        boolean running = true;
        Random random = new SecureRandom();
    
        @Override
        public void run(SourceContext context) throws Exception {
          while (running) {
            Map<String, String> tuple = new HashMap();
            tuple.put("device", "root.sg.d1");
            tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
            tuple.put("measurements", "s1");
            tuple.put("types", "DOUBLE");
            tuple.put("values", String.valueOf(random.nextDouble()));
    
            context.collect(tuple);
            Thread.sleep(1000);
          }
        }
    
        @Override
        public void cancel() {
          running = false;
        }
      }
    }
    
    

运行方法

  • 启动 IoTDB server
  • 运行 org.apache.iotdb.flink.FlinkIoTDBSink.java 将 Flink job 运行在本地的集群上。