tree: 2d8d2dbc7ad1b1894df0dea5fd3e42a1ef2cc45b [path history] [tgz]
  1. README.md
docs/stream_source/README.md

创建实时任务数据源

通过 DataStreamSource dataStreamSource = StreamBuilder.dataStream("test_namespace", "graph_pipeline");完成dataStreamSource的构建后,就可以通过dataStreamSource来创建数据源了

我们可以从如下数据中创建数据源

数组

    Object[] array = .....; //数组
    DataStream dataStream = dataStreamSource.fromArray(array);

文件

    String filepath = .....; //文件路径
    DataStream dataStream = dataStreamSource.fromFile(filePath);

或者

    String filepath = .....;  //文件路径
    Boolean isJsonData = true;   //是否json格式
    DataStream dataStream = dataStreamSource.fromFile(filePath, isJsonData);

Rocketmq


String topic = .....; //rocketmq 的topic String groupName = .....; // rocketmq的消费组 String namesrvAddress = ......; //rocketmq的nameserver DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, namesrvAddress);

或者


String topic = .....; //rocketmq 的topic String groupName = .....; // rocketmq的消费组 String namesrvAddress = ......; //rocketmq的nameserver Boolean isJsonData = true; //是否json DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, isJsonData, namesrvAddress);

或者


String topic = .....; //rocketmq 的topic String groupName = .....; // rocketmq的消费组 String namesrvAddress = ......; //rocketmq的nameserver Boolean isJsonData = true; //是否json String tags = ......; // rocketmq的tag信息 DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, tags, isJsonData, namesrvAddress);

MQTT协议


String url = ......; String clientId = ......; String topic = ......; DataStream dataStream = dataStreamSource.fromMqtt(url, cliientId, topic);

或者


String url = ......; String clientId = ......; String topic = ......; String username = ......; String password = ......; DataStream dataStream = dataStreamSource.fromMqtt(url, cliientId, topic, username, password);

或者


String url = ......; String clientId = ......; String topic = ......; String username = ......; String password = ......; Boolean cleanSession = true; Integer connectionTimeout = 10; Integer aliveInterval = 60; Boolean automatiicReconnect = true; DataStream dataStream = dataStreamSource.fromMqtt(url, cliientId, topic, username, password, cleanSession, connectionTimeout, aliveInterval, automaticReconnect);

##自定义Source

    DataStream dataStream = dataStreamSource.from(new ISource<ISource>() {});