我们可以将实时任务处理的结果以如下的方式输出
dataStream.toPrint();
或者
dataStream.toPrint(batchSize);
String filePath=......; Integer batchSize=.....; Boolean isAppend=true; dataStream.toFile(filePath,batchSize,isAppend);
或者
String filePath=......; Boolean isAppend=true; dataStream.toFile(filePath,isAppend);
或者
String filePath=......; dataStream.toFile(filePath);
String url=......; String userName=.....; String password=......; String tableName=......; dataStream.toDB(url,userName,password,tableName);
String topic=.....; //rocketmq 的topic String namesrvAddress=......; //rocketmq的nameserver DataStream dataStream=dataStream.toRocketmq(topic,namesrvAddress);
或者
String topic=.....; //rocketmq 的topic String groupName=.....; // rocketmq的消费组 String namesrvAddress=......; //rocketmq的nameserver DataStream dataStream=dataStream.toRocketmq(topic,groupName,namesrvAddress);
或者
String topic=.....; //rocketmq 的topic String groupName=.....; // rocketmq的消费组 String namesrvAddress=......; //rocketmq的nameserver String tags=......; // rocketmq的tag信息 DataStream dataStream=dataStream.toRocketmq(topic,tags,groupName,namesrvAddress);
String bootstrapServers = ......;//kafka的bootstrap server String topic = ......; //kafka的topic DataStream dataStream = dataStream.toKafka(bootstrapServers, topic);
String url=......; String clientId=......; String topic=......; DataStream dataStream=dataStream.toMqtt(url,cliientId,topic);
或者
String url=......; String clientId=......; String topic=......; String username=......; String password=......; DataStream dataStream=dataStream.toMqtt(url,cliientId,topic,username,password);
dataStreamSource.to(new ISink<ISource>(){});