blob: 455d8a2b54f315a5372eef24e6bfde1b8d9993e5 [file] [log] [blame]
package org.apache.storm.beam.translation;
import avro.shaded.com.google.common.collect.Lists;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PValue;
import org.apache.storm.beam.translation.runtime.GroupByKeyCompleteBolt;
import org.apache.storm.beam.translation.runtime.GroupByKeyInitBolt;
import java.util.List;
/**
* Translates a Beam GroupByKey operation into a pair of Storm Bolts with a fields grouping.
*
* TODO: From a Beam perspective this is likely the wrong approach to doing GBK
*/
public class GroupByKeyTranslator<K, V> implements
TransformTranslator<GroupByKey<K, V>> {
@Override
public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
PValue pvFrom = (PValue)context.getCurrentTransform().getInput();
PValue pvTo = (PValue)context.getCurrentTransform().getEnclosingNode().getOutput();
String from = baseName(pvFrom.getName());
String to = baseName(pvTo.getName());
context.activateGBK(to);
String initBolt = from + "_GBK_init"; // first GBK bolt
String completeBolt = from + "_GBK_complete";
GroupByKeyInitBolt gbkInit = new GroupByKeyInitBolt();
GroupByKeyCompleteBolt gbkComplete = new GroupByKeyCompleteBolt();
// from --> initBolt
TranslationContext.Stream stream = new TranslationContext.Stream(from, initBolt, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
context.addStream(stream);
context.addBolt(initBolt, gbkInit);
// initBolt --> completeBolt
TranslationContext.Grouping fieldsGrouping = new TranslationContext.Grouping(TranslationContext.Grouping.Type.FIELDS);
List fields = Lists.newArrayList();
fields.add("keyValue");
fieldsGrouping.setArgs(fields);
context.addBolt(completeBolt, gbkComplete);
stream = new TranslationContext.Stream(initBolt, completeBolt, fieldsGrouping);
context.addStream(stream);
// completeBolt --> to
stream = new TranslationContext.Stream(completeBolt, to, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
context.addStream(stream);
}
private static String baseName(String str){
return str.substring(0, str.lastIndexOf("."));
}
}