| package org.apache.storm.beam.translation.runtime; |
| |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.storm.task.OutputCollector; |
| import org.apache.storm.task.TopologyContext; |
| import org.apache.storm.topology.OutputFieldsDeclarer; |
| import org.apache.storm.topology.base.BaseRichBolt; |
| import org.apache.storm.tuple.Fields; |
| import org.apache.storm.tuple.Tuple; |
| import org.apache.storm.tuple.Values; |
| |
| import java.util.Map; |
| |
| /** |
| * |
| */ |
| public class GroupByKeyCompleteBolt extends BaseRichBolt { |
| private OutputCollector collector; |
| |
| |
| @Override |
| public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { |
| this.collector = collector; |
| } |
| |
| public GroupByKeyCompleteBolt() { |
| } |
| |
| @Override |
| public void execute(Tuple input) { |
| this.collector.emit(input, new Values(input.getValueByField("windowedValue"))); |
| this.collector.ack(input); |
| } |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields("value")); |
| } |
| } |