dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .map(message->message+"~~~~~") .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .filter(message->message.contains("xxxxx")) //为true时数据继续向下游输出,否则别拦截 .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .forEach(message->message.contains("xxxxx")) //为true时数据继续向下游输出,否则别拦截 .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .forEach(message->message.contains("xxxxx")) //为true时数据继续向下游输出,否则别拦截 .selectFields("field1","field2") .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .script("ProjectName, =, project") //为true时数据继续向下游输出,否则别拦截 .toPrint() .start();
在窗口内进行相关的统计分析,一般会与groupBy
连用, window()
用来定义窗口的大小, groupBy()
用来定义统计分析的主key,可以指定多个
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("AttributeCode") .count("asName") //指定别名 .toDataSteam() .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("AttributeCode") .avg("field","avg_value") .toDataSteam() .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("AttributeCode") .min("field") .toDataSteam() .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("AttributeCode") .max("field") .toDataSteam() .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("AttributeCode") .sum("field","asField") .toDataSteam() .toPrint() .start();
dataStream.fromMqtt("xxxxx","xxxx","xxxxxx","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("AttributeCode") .ruduce(new ReduceFunction(){}) .toDataSteam() .toPrint() .start();
关键计算,根据条件将俩个流,或者流与物理表进行关联,最终输出结果
根据条件将俩个流进行内关联
DataStream left=......; DataStream right=......; left.join(right).on("(ProjectName,=,project)").toDataSteam().toPrint().start();
根据条件将俩个流的数据进行左关联
DataStream left=......; DataStream right=......; left.leftJoin(right).on("(ProjectName,=,project)").toDataSteam().toPrint().start();
根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
DataStream dataStream=......; dataStream .dimJoin("classpath://dim.txt",10000) .on("(ProjectName,=,project)") .toDataSteam() .toPrint() .start();
根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
DataStream dataStream=......; dataStream .dimLeftJoin("classpath://dim.txt",10000) .on("(ProjectName,=,project)") .toDataSteam() .toPrint() .start();
将俩个流进行合并
DataStream leftStream=......; DataStream rightStream=......; leftStream.union(rightStream).toPrint().start();
将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
DataStream dataStream=......; stream.split(new SplitFunction<JSONObject>(){}).toPrint().start();
with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
dataStream.fromMqtt("","","","","") .flatMap(message->((JSONObject)message).getJSONArray("Data")) .window(TumblingWindow.of(Time.minutes(1))) .groupBy("AttributeCode") .setLocalStorageOnly(true) .avg("Value","avg_value") .toDataSteam() .toPrint() .with(ShuffleStrategy.shuffleWithMemory()) .start();