创建实时任务数据输出

我们可以将实时任务处理的结果以如下的方式输出

打印

    dataStream.toPrint();

或者

    dataStream.toPrint(batchSize);

文件

    String filePath=......;
    Integer batchSize=.....;
    Boolean isAppend=true;
    dataStream.toFile(filePath,batchSize,isAppend);

或者

    String filePath=......;
    Boolean isAppend=true;
    dataStream.toFile(filePath,isAppend);

或者

    String filePath=......;
    dataStream.toFile(filePath);

DB

    String url=......;
    String userName=.....;
    String password=......;
    String tableName=......;
    dataStream.toDB(url,userName,password,tableName);

Rocketmq


String topic=.....; //rocketmq 的topic String namesrvAddress=......; //rocketmq的nameserver DataStream dataStream=dataStream.toRocketmq(topic,namesrvAddress);

或者


String topic=.....; //rocketmq 的topic String groupName=.....; // rocketmq的消费组 String namesrvAddress=......; //rocketmq的nameserver DataStream dataStream=dataStream.toRocketmq(topic,groupName,namesrvAddress);

或者


String topic=.....; //rocketmq 的topic String groupName=.....; // rocketmq的消费组 String namesrvAddress=......; //rocketmq的nameserver String tags=......; // rocketmq的tag信息 DataStream dataStream=dataStream.toRocketmq(topic,tags,groupName,namesrvAddress);

kafka

    String bootstrapServers = ......;//kafka的bootstrap server
    String topic = ......; //kafka的topic
    DataStream dataStream = dataStream.toKafka(bootstrapServers, topic);

MQTT协议


String url=......; String clientId=......; String topic=......; DataStream dataStream=dataStream.toMqtt(url,cliientId,topic);

或者


String url=......; String clientId=......; String topic=......; String username=......; String password=......; DataStream dataStream=dataStream.toMqtt(url,cliientId,topic,username,password);

自定义Sink

    dataStreamSource.to(new ISink<ISource>(){});