STORM-2174: Initial commit for beam runner.
diff --git a/external/storm-beam/pom.xml b/external/storm-beam/pom.xml
new file mode 100644
index 0000000..4654005
--- /dev/null
+++ b/external/storm-beam/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>storm-beam</artifactId>
+ <packaging>jar</packaging>
+
+
+ <name>Apache Storm Beam Runner</name>
+ <properties>
+ <beam.version>0.2.0-incubating-SNAPSHOT</beam.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-java</artifactId>
+ <version>${beam.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${beam.version}</version>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>1.0-rc2</version>
+ <optional>true</optional>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java
new file mode 100644
index 0000000..1c87e54
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java
@@ -0,0 +1,109 @@
+package org.apache.storm.beam;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by tgoetz on 7/28/16.
+ */
+public class RandomSentenceSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
+
+ private final Coder<String> coder;
+
+ public RandomSentenceSource(Coder<String> coder){
+ this.coder = coder;
+ }
+
+ @Override
+ public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(int i, PipelineOptions pipelineOptions) throws Exception {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<String> createReader(PipelineOptions pipelineOptions, @Nullable CheckpointMark checkpointMark) throws IOException {
+ return new RandomSentenceReader(this);
+ }
+
+ @Nullable
+ @Override
+ public Coder<CheckpointMark> getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+
+ }
+
+ @Override
+ public Coder<String> getDefaultOutputCoder() {
+ return this.coder;
+ }
+
+
+
+ public static class RandomSentenceReader extends UnboundedReader<String> {
+
+ private String[] values = {"blah blah blah", "foo bar", "my dog has fleas"};
+ private int index = 0;
+ private final UnboundedSource<String, CheckpointMark> source;
+
+ public RandomSentenceReader(UnboundedSource<String, CheckpointMark> source){
+ this.source = source;
+ }
+
+
+ @Override
+ public boolean start() throws IOException {
+ index = 0;
+ return true;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ index++;
+ if(index == values.length){
+ index = 0;
+ }
+ return true;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<String, ?> getCurrentSource() {
+ return this.source;
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ return values[index];
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java
new file mode 100644
index 0000000..8c710ab
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java
@@ -0,0 +1,12 @@
+package org.apache.storm.beam;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Created by tgoetz on 7/27/16.
+ */
+public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions {
+
+
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java
new file mode 100644
index 0000000..0e10271
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java
@@ -0,0 +1,31 @@
+package org.apache.storm.beam;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+public class StormRegistrar {
+ private StormRegistrar(){}
+
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+ StormRunner.class);
+ }
+ }
+
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(
+ StormPipelineOptions.class);
+ }
+ }
+
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java
new file mode 100644
index 0000000..cc92349
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java
@@ -0,0 +1,170 @@
+package org.apache.storm.beam;
+
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.beam.translation.StormPipelineTranslator;
+import org.apache.storm.beam.translation.TranslationContext;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Main entry point into the Storm Runner.
+ *
+ * After reading the user defined pipeline, Beam will invoke the run() method with a representation
+ * of the pipeline.
+ *
+ * TODO: Only supports storm local mode for now.
+ */
+public class StormRunner extends PipelineRunner<StormRunner.StormPipelineResult> {
+ private static final Logger LOG = LoggerFactory.getLogger(StormRunner.class);
+
+ private StormPipelineOptions options;
+
+ public StormRunner(StormPipelineOptions options){
+ this.options = options;
+ }
+
+ public static StormRunner fromOptions(PipelineOptions options){
+ StormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(StormPipelineOptions.class, options);
+ return new StormRunner(pipelineOptions);
+
+ }
+
+ @Override
+ public StormPipelineResult run(Pipeline pipeline) {
+ LOG.info("Running pipeline...");
+ TranslationContext context = new TranslationContext(this.options);
+ StormPipelineTranslator transformer = new StormPipelineTranslator(context);
+ transformer.translate(pipeline);
+
+ for(TranslationContext.Stream stream : context.getStreams()){
+ LOG.info(stream.getFrom() + " --> " + stream.getTo());
+ }
+
+ runTopologyLocal(getTopology(context));
+ return null;
+ }
+
+ private void runTopologyLocal(StormTopology topology){
+ Config conf = new Config();
+ conf.setMaxSpoutPending(1000);
+// conf.setDebug(true);
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("word-count", conf, topology);
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+// cluster.shutdown();
+ }
+
+ public static class StormPipelineResult implements PipelineResult {
+ private State state;
+
+ public State getState() {
+ return this.state;
+ }
+
+ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
+ return null;
+ }
+ }
+
+ private StormTopology getTopology(TranslationContext context){
+ TopologyBuilder builder = new TopologyBuilder();
+ Map<String, IRichSpout> spouts = context.getSpouts();
+ for(String id : spouts.keySet()){
+ builder.setSpout(id, spouts.get(id));
+ }
+
+ HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
+ for (TranslationContext.Stream stream : context.getStreams()) {
+ Object boltObj = context.getBolt(stream.getTo());
+ BoltDeclarer declarer = declarers.get(stream.getTo());
+ if (boltObj instanceof IRichBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(stream.getTo(),
+ (IRichBolt) boltObj);
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IBasicBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IBasicBolt) boltObj);
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IWindowedBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IWindowedBolt) boltObj);
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else if (boltObj instanceof IStatefulBolt) {
+ if(declarer == null) {
+ declarer = builder.setBolt(
+ stream.getTo(),
+ (IStatefulBolt) boltObj);
+ declarers.put(stream.getTo(), declarer);
+ }
+ } else {
+ throw new IllegalArgumentException("Class does not appear to be a bolt: " +
+ boltObj.getClass().getName());
+ }
+
+ TranslationContext.Grouping grouping = stream.getGrouping();
+ // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
+ String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
+
+
+ switch (grouping.getType()) {
+ case SHUFFLE:
+ declarer.shuffleGrouping(stream.getFrom(), streamId);
+ break;
+ case FIELDS:
+ //TODO check for null grouping args
+ declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
+ break;
+ case ALL:
+ declarer.allGrouping(stream.getFrom(), streamId);
+ break;
+ case DIRECT:
+ declarer.directGrouping(stream.getFrom(), streamId);
+ break;
+ case GLOBAL:
+ declarer.globalGrouping(stream.getFrom(), streamId);
+ break;
+ case LOCAL_OR_SHUFFLE:
+ declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
+ break;
+ case NONE:
+ declarer.noneGrouping(stream.getFrom(), streamId);
+ break;
+ default:
+ throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+ }
+ }
+
+ return builder.createTopology();
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java
new file mode 100644
index 0000000..b98c1aa
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * A minimal word count pipeline using the Beam API, running on top of Storm
+ *
+ * When the Storm Runner is reasonably complete, running this pipline in Storm
+ * should yield that same output as running it on the Beam DirectRunner
+ *
+ */
+public class StormWordCount {
+
+ static class ExtractWordsFn extends DoFn<String, String> {
+ private final Aggregator<Long, Long> emptyLines =
+ createAggregator("emptyLines", new Sum.SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().trim().isEmpty()) {
+ emptyLines.addValue(1L);
+ }
+
+ // Split the line into words.
+ String[] words = c.element().split("[^a-zA-Z']+");
+
+ // Output each word encountered into the output PCollection.
+ for (String word : words) {
+ if (!word.isEmpty()) {
+ System.out.println(word);
+ c.output(word);
+ }
+ }
+ }
+ }
+
+ /**
+ * A SimpleFunction that converts a Word and Count into a printable string.
+ */
+ public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+ @Override
+ public String apply(KV<String, Long> input) {
+ String retval = input.getKey() + ": " + input.getValue();
+ System.out.println(retval);
+ return retval;
+ }
+ }
+
+ /**
+ * A PTransform that converts a PCollection containing lines of text into a PCollection of
+ * formatted word counts.
+ */
+ public static class CountWords extends PTransform<PCollection<String>,
+ PCollection<KV<String, Long>>> {
+ @Override
+ public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+ // Convert lines of text into individual words.
+ PCollection<String> words = lines.apply(
+ ParDo.of(new ExtractWordsFn()));
+
+ // Count the number of times each word occurs.
+ PCollection<KV<String, Long>> wordCounts =
+ words.apply(Count.<String>perElement());
+
+ return wordCounts;
+ }
+ }
+
+ /**
+ * Options supported by {@link StormWordCount}.
+ * <p>
+ * <p>Inherits standard configuration options.
+ */
+ public interface WordCountOptions extends PipelineOptions {
+
+ }
+
+ public static void main(String[] args) {
+ WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(WordCountOptions.class);
+ Pipeline p = Pipeline.create(options);
+ p.apply("Spout", Read.from(new RandomSentenceSource(StringUtf8Coder.of())))
+ .apply("Window", Window.<String>into(FixedWindows.of(Duration.standardSeconds(2))))
+ .apply("ExtractWords", ParDo.of(new ExtractWordsFn()))
+ .apply(new CountWords());
+
+ p.run();
+
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..455d8a2
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java
@@ -0,0 +1,58 @@
+package org.apache.storm.beam.translation;
+
+import avro.shaded.com.google.common.collect.Lists;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.storm.beam.translation.runtime.GroupByKeyCompleteBolt;
+import org.apache.storm.beam.translation.runtime.GroupByKeyInitBolt;
+
+import java.util.List;
+
+/**
+ * Translates a Beam GroupByKey operation into a pair of Storm Bolts with a fields grouping.
+ *
+ * TODO: From a Beam perspective this is likely the wrong approach to doing GBK
+ */
+public class GroupByKeyTranslator<K, V> implements
+ TransformTranslator<GroupByKey<K, V>> {
+ @Override
+ public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+ PValue pvFrom = (PValue)context.getCurrentTransform().getInput();
+
+ PValue pvTo = (PValue)context.getCurrentTransform().getEnclosingNode().getOutput();
+
+ String from = baseName(pvFrom.getName());
+ String to = baseName(pvTo.getName());
+ context.activateGBK(to);
+ String initBolt = from + "_GBK_init"; // first GBK bolt
+ String completeBolt = from + "_GBK_complete";
+
+ GroupByKeyInitBolt gbkInit = new GroupByKeyInitBolt();
+ GroupByKeyCompleteBolt gbkComplete = new GroupByKeyCompleteBolt();
+
+
+ // from --> initBolt
+ TranslationContext.Stream stream = new TranslationContext.Stream(from, initBolt, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+ context.addStream(stream);
+ context.addBolt(initBolt, gbkInit);
+
+ // initBolt --> completeBolt
+ TranslationContext.Grouping fieldsGrouping = new TranslationContext.Grouping(TranslationContext.Grouping.Type.FIELDS);
+ List fields = Lists.newArrayList();
+ fields.add("keyValue");
+ fieldsGrouping.setArgs(fields);
+ context.addBolt(completeBolt, gbkComplete);
+ stream = new TranslationContext.Stream(initBolt, completeBolt, fieldsGrouping);
+ context.addStream(stream);
+
+ // completeBolt --> to
+ stream = new TranslationContext.Stream(completeBolt, to, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+ context.addStream(stream);
+ }
+
+
+ private static String baseName(String str){
+ return str.substring(0, str.lastIndexOf("."));
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java
new file mode 100644
index 0000000..671b84d
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java
@@ -0,0 +1,51 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.storm.beam.translation.runtime.DoFnBolt;
+import org.apache.storm.beam.translation.util.DefaultSideInputReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a ParDo.Bound to a Storm DoFnBolt
+ */
+public class ParDoBoundTranslator<InputT, OutputT> implements
+ TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+
+ @Override
+ public void translateNode(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getFn();
+ PCollection<OutputT> output = context.getOutput();
+ WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
+
+ DoFnBolt<InputT, OutputT> bolt = new DoFnBolt<>(context.getOptions(), doFn,
+ windowingStrategy, new DefaultSideInputReader());
+
+ PValue pvFrom = (PValue)context.getCurrentTransform().getInput();
+ String from = baseName(pvFrom.getName());
+ if(context.isGBKActive()){
+ from = context.completeGBK();
+ }
+ LOG.info(baseName(pvFrom.getName()));
+
+ PValue pvTo = (PValue)context.getCurrentTransform().getOutput();
+ LOG.info(baseName(pvTo.getName()));
+ String to = baseName(pvTo.getName());
+
+ TranslationContext.Stream stream = new TranslationContext.Stream(from, to, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+
+ context.addStream(stream);
+ context.addBolt(baseName(pvTo.getName()), bolt);
+ }
+
+ private static String baseName(String str){
+ return str.substring(0, str.lastIndexOf("."));
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java
new file mode 100644
index 0000000..d3dd478
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java
@@ -0,0 +1,51 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class StormPipelineTranslator implements Pipeline.PipelineVisitor{
+ private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
+ private TranslationContext context;
+
+ public StormPipelineTranslator(TranslationContext context){
+ this.context = context;
+ }
+
+
+ public void translate(Pipeline pipeline) {
+ pipeline.traverseTopologically(this);
+ }
+
+ public CompositeBehavior enterCompositeTransform(TransformTreeNode transformTreeNode) {
+ LOG.info("entering composite translation {}", transformTreeNode.getTransform());
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ public void leaveCompositeTransform(TransformTreeNode transformTreeNode) {
+ LOG.info("leaving composite translation {}", transformTreeNode.getTransform());
+ }
+
+ public void visitPrimitiveTransform(TransformTreeNode transformTreeNode) {
+ LOG.info("visiting transform {}", transformTreeNode.getTransform());
+ PTransform transform = transformTreeNode.getTransform();
+ LOG.info("class: {}", transform.getClass());
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+ if(translator != null) {
+ context.setCurrentTransform(transformTreeNode);
+ translator.translateNode(transformTreeNode.getTransform(), context);
+ } else {
+ LOG.warn("No translator found for {}", transform.getClass());
+ }
+ }
+
+ public void visitValue(PValue value, TransformTreeNode transformTreeNode) {
+ LOG.info("visiting value {}", value);
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java
new file mode 100644
index 0000000..cb31060
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java
@@ -0,0 +1,11 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+public interface TransformTranslator <Type extends PTransform>{
+
+ void translateNode(Type transform, TranslationContext context);
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java
new file mode 100644
index 0000000..b3920c0
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java
@@ -0,0 +1,186 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.topology.IRichSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Maintains the state necessary during Pipeline translation to build a Storm topology.
+ */
+public class TranslationContext {
+ private StormPipelineOptions options;
+
+ private TransformTreeNode currentTransform;
+
+ private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
+
+ private Map<String, Object> boltMap = new HashMap<String, Object>();
+
+ private List<Stream> streams = new ArrayList<Stream>();
+
+ public TranslationContext(StormPipelineOptions options){
+ this.options = options;
+
+ }
+
+ private String gbkTo = null;
+
+ public StormPipelineOptions getOptions(){
+ return this.options;
+ }
+
+ public void addSpout(String id, IRichSpout spout){
+ this.spoutMap.put(id, spout);
+ }
+
+ public Map<String, IRichSpout> getSpouts(){
+ return this.spoutMap;
+ }
+
+ public void addBolt(String id, Object bolt){
+ this.boltMap.put(id, bolt);
+ }
+
+ public Object getBolt(String id){
+ return this.boltMap.get(id);
+ }
+
+ public void addStream(Stream stream){
+ this.streams.add(stream);
+ }
+ public List<Stream> getStreams(){
+ return this.streams;
+ }
+
+ public void setCurrentTransform(TransformTreeNode transform){
+ this.currentTransform = transform;
+ }
+
+ public TransformTreeNode getCurrentTransform(){
+ return this.currentTransform;
+ }
+
+ public <InputT extends PInput> InputT getInput() {
+ return (InputT) getCurrentTransform().getInput();
+ }
+
+ public <OutputT extends POutput> OutputT getOutput() {
+ return (OutputT) getCurrentTransform().getOutput();
+ }
+
+ public void activateGBK(String gbkTo){
+ this.gbkTo = gbkTo;
+ }
+
+ public String completeGBK(){
+ String gbkTo = this.gbkTo;
+ this.gbkTo = null;
+ return gbkTo;
+ }
+
+ public boolean isGBKActive(){
+ return this.gbkTo != null;
+ }
+
+
+
+ public static class Stream {
+
+ private String from;
+ private String to;
+ private Grouping grouping;
+
+ public Stream(String from, String to, Grouping grouping){
+ this.from = from;
+ this.to = to;
+ this.grouping = grouping;
+ }
+
+ public String getTo() {
+ return to;
+ }
+
+ public void setTo(String to) {
+ this.to = to;
+ }
+
+ public String getFrom() {
+ return from;
+ }
+
+ public void setFrom(String from) {
+ this.from = from;
+ }
+
+ public Grouping getGrouping() {
+ return grouping;
+ }
+
+ public void setGrouping(Grouping grouping) {
+ this.grouping = grouping;
+ }
+ }
+
+ public static class Grouping {
+
+ /**
+ * Types of stream groupings Storm allows
+ */
+ public static enum Type {
+ ALL,
+ CUSTOM,
+ DIRECT,
+ SHUFFLE,
+ LOCAL_OR_SHUFFLE,
+ FIELDS,
+ GLOBAL,
+ NONE
+ }
+
+ private Type type;
+ private String streamId; // for named streams, other than DEFAULT
+ private List<String> args; // arguments for fields grouping
+
+
+ public Grouping(Type type){
+ this.type = type;
+ }
+
+ public Grouping(List<String> args){
+ this.type = Type.FIELDS;
+ this.args = args;
+ }
+ public List<String> getArgs() {
+ return args;
+ }
+
+ public void setArgs(List<String> args) {
+ this.args = args;
+ }
+
+ public Type getType() {
+ return type;
+ }
+
+ public void setType(Type type) {
+ this.type = type;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java
new file mode 100644
index 0000000..bdb42f8
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java
@@ -0,0 +1,29 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Lookup table mapping PTransform types to associated TransformTranslator implementations.
+ */
+public class TranslatorRegistry {
+ private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap();
+
+ static {
+ TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
+ TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
+ TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslator<>());
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator<>());
+ }
+
+ static TransformTranslator<?> getTranslator(
+ PTransform<?, ?> transform) {
+ return TRANSLATORS.get(transform.getClass());
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java
new file mode 100644
index 0000000..c177a89
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java
@@ -0,0 +1,21 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.beam.translation.runtime.UnboundedSourceSpout;
+
+/**
+ * Translates a Read.Unbounded into a Storm spout.
+ * @param <T>
+ */
+public class UnboundedSourceTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+ public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+ UnboundedSource source = transform.getSource();
+ StormPipelineOptions options = context.getOptions();
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(source, options);
+
+ String name = context.getCurrentTransform().getFullName();
+ context.addSpout(name, spout);
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java
new file mode 100644
index 0000000..be6d1f9
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java
@@ -0,0 +1,47 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.storm.beam.translation.runtime.WindowBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a Window.Bound node into a Storm WindowedBolt
+ * @param <T>
+ */
+public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
+
+ @Override
+ public void translateNode(Window.Bound<T> transform, TranslationContext context) {
+ if(transform.getWindowFn() instanceof FixedWindows){
+ Duration size = ((FixedWindows) transform.getWindowFn()).getSize();
+
+ WindowBolt bolt = new WindowBolt();
+ bolt.withTumblingWindow(WindowBolt.Duration.seconds((int)size.getStandardSeconds()));
+
+ PValue from = (PValue)context.getCurrentTransform().getInput();
+ LOG.info(baseName(from.getName()));
+
+ PValue to = (PValue)context.getCurrentTransform().getOutput();
+ LOG.info(baseName(to.getName()));
+
+ TranslationContext.Stream stream = new TranslationContext.Stream(baseName(from.getName()), baseName(to.getName()), new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+
+ context.addStream(stream);
+ context.addBolt(baseName(to.getName()), bolt);
+
+ } else {
+ throw new UnsupportedOperationException("Currently only fixed windows are supported.");
+ }
+ }
+
+
+ private static String baseName(String str){
+ return str.substring(0, str.lastIndexOf("."));
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java
new file mode 100644
index 0000000..3a166e7
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java
@@ -0,0 +1,102 @@
+package org.apache.storm.beam.translation.runtime;
+
+import com.google.api.client.util.Lists;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.*;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.beam.translation.util.DefaultStepContext;
+import org.apache.storm.beam.util.SerializedPipelineOptions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.*;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by tgoetz on 8/2/16.
+ */
+public class DoFnBolt<InputT, OutputT> extends BaseRichBolt implements DoFnRunners.OutputManager{
+
+ private transient DoFnRunner<InputT, OutputT> runner = null;
+
+ private final TupleTag<OutputT> tupleTag = new TupleTag<OutputT>() {};
+
+ private transient OutputCollector collector;
+
+ private List<WindowedValue<OutputT>> output = Lists.newArrayList();
+
+ private SerializedPipelineOptions serializedOptions;
+ private transient StormPipelineOptions pipelineOptions;
+
+ private DoFn<InputT, OutputT> doFn;
+ private WindowingStrategy<?, ?> windowingStrategy;
+ private SideInputReader sideInputReader;
+
+ public DoFnBolt(
+ StormPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ SideInputReader sideInputReader){
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.doFn = doFn;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+
+ Counter<Integer> counter = Counter.ints("foo", Counter.AggregationKind.SUM);
+ CounterSet counters = new CounterSet(counter);
+
+ this.runner = new StormDoFnRunner(this.pipelineOptions, this.doFn, this.sideInputReader, this, this.tupleTag, TupleTagList.empty().getAll(), new DefaultStepContext(), counters.getAddCounterMutator(), this.windowingStrategy);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ System.out.println("Type: " + input.getValue(0).getClass());
+ Object value = input.getValue(0);
+ this.output = Lists.newArrayList();
+ this.runner.startBundle();
+ if(value instanceof List){
+ for(Object o : ((List)value)){
+ this.runner.processElement((WindowedValue)o);
+ }
+
+ } else {
+ this.runner.processElement((WindowedValue) input.getValue(0));
+ }
+ this.runner.finishBundle();
+
+// for(WindowedValue val : this.output){
+// this.collector.emit(input, new Values(val));
+// }
+ this.collector.emit(new Values(this.output));
+ this.collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("value"));
+ }
+
+
+ @Override
+ public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
+ if(this.tupleTag.equals(tupleTag)){
+ this.output.add((WindowedValue<OutputT>)windowedValue);
+ } else {
+ throw new RuntimeException("Wrong tag");
+ }
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java
new file mode 100644
index 0000000..d541709
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java
@@ -0,0 +1,40 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class GroupByKeyCompleteBolt extends BaseRichBolt {
+ private OutputCollector collector;
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public GroupByKeyCompleteBolt() {
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ this.collector.emit(input, new Values(input.getValueByField("windowedValue")));
+ this.collector.ack(input);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("value"));
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java
new file mode 100644
index 0000000..aaba57e
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java
@@ -0,0 +1,47 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class GroupByKeyInitBolt extends BaseRichBolt {
+ private OutputCollector collector;
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public GroupByKeyInitBolt() {
+ }
+
+ @Override
+ public void execute(Tuple input) {
+
+ List<WindowedValue<KV>> values = (List<WindowedValue<KV>>)input.getValue(0);
+ for(WindowedValue<KV> value : values) {
+ KV kv = value.getValue();
+ Object key = kv.getKey();
+ this.collector.emit(input, new Values(key, value));
+ this.collector.ack(input);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("keyValue", "windowedValue"));
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java
new file mode 100644
index 0000000..632fdbf
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java
@@ -0,0 +1,27 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.*;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public class StormDoFnRunner extends SimpleDoFnRunner implements Serializable {
+ public StormDoFnRunner(PipelineOptions options,
+ DoFn fn,
+ SideInputReader sideInputReader,
+ DoFnRunners.OutputManager outputManager,
+ TupleTag mainOutputTag, List sideOutputTags,
+ ExecutionContext.StepContext stepContext,
+ CounterSet.AddCounterMutator addCounterMutator,
+ WindowingStrategy windowingStrategy) {
+ super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
+ addCounterMutator, windowingStrategy);
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java
new file mode 100644
index 0000000..349576e
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java
@@ -0,0 +1,109 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.beam.util.SerializedPipelineOptions;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Spout implementation that wraps a Beam UnboundedSource
+ */
+public class UnboundedSourceSpout extends BaseRichSpout{
+ private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+ private UnboundedSource source;
+ private transient UnboundedSource.UnboundedReader reader;
+ private SerializedPipelineOptions serializedOptions;
+ private transient StormPipelineOptions pipelineOptions;
+
+ private SpoutOutputCollector collector;
+
+ public UnboundedSourceSpout(UnboundedSource source, StormPipelineOptions options){
+ this.source = source;
+ this.serializedOptions = new SerializedPipelineOptions(options);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ try {
+ this.reader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void activate() {
+ super.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ super.deactivate();
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ super.ack(msgId);
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ super.fail(msgId);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return super.getComponentConfiguration();
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("value"));
+
+ }
+
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ try {
+
+ this.collector = collector;
+ this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+ this.reader = this.source.createReader(this.pipelineOptions, null);
+ this.reader.start();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create unbounded reader.", e);
+ }
+
+ }
+
+ public void nextTuple() {
+ try {
+ if(this.reader.advance()){
+ Object value = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+ Instant watermark = reader.getWatermark();
+
+ WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+ collector.emit(new Values(wv), UUID.randomUUID());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Exception reading values from source.", e);
+ }
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java
new file mode 100644
index 0000000..4e8a7fb
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java
@@ -0,0 +1,48 @@
+package org.apache.storm.beam.translation.runtime;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class WindowBolt extends BaseWindowedBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowBolt.class);
+
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ super.prepare(stormConf, context, collector);
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(TupleWindow inputWindow) {
+ LOG.info("*******************Executing window with size {}", inputWindow.get().size());
+ LOG.info("Type: {}", inputWindow.get().getClass());
+ List values = Lists.newArrayList();
+ for(Tuple t :inputWindow.get()){
+ values.add(t.getValue(0));
+ }
+ collector.emit(inputWindow.get(), new Values(values));
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("window"));
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java
new file mode 100644
index 0000000..bea9e8d
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java
@@ -0,0 +1,29 @@
+package org.apache.storm.beam.translation.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * No-op SideInputReader implementation.
+ */
+public class DefaultSideInputReader implements SideInputReader, Serializable {
+ @Nullable
+ @Override
+ public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
+ return null;
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> pCollectionView) {
+ return false;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java
new file mode 100644
index 0000000..ffdd19b
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java
@@ -0,0 +1,51 @@
+package org.apache.storm.beam.translation.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.io.IOException;
+
+/**
+ * No-op StepContext implementation.
+ */
+public class DefaultStepContext implements ExecutionContext.StepContext {
+ @Override
+ public String getStepName() {
+ return null;
+ }
+
+ @Override
+ public String getTransformName() {
+ return null;
+ }
+
+ @Override
+ public void noteOutput(WindowedValue<?> windowedValue) {
+
+ }
+
+ @Override
+ public void noteSideOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+
+ }
+
+ @Override
+ public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<Iterable<WindowedValue<T>>> coder, W w, Coder<W> coder1) throws IOException {
+
+ }
+
+ @Override
+ public StateInternals<?> stateInternals() {
+ return null;
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ return null;
+ }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java b/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java
new file mode 100644
index 0000000..8b14fed
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.beam.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+ private final byte[] serializedOptions;
+
+ /** Lazily initialized copy of deserialized options */
+ private transient PipelineOptions pipelineOptions;
+
+ public SerializedPipelineOptions(PipelineOptions options) {
+ checkNotNull(options, "PipelineOptions must not be null.");
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ new ObjectMapper().writeValue(baos, options);
+ this.serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
+
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ if (pipelineOptions == null) {
+ try {
+ pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
+ }
+
+ return pipelineOptions;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index a2aec4d..8242319 100644
--- a/pom.xml
+++ b/pom.xml
@@ -305,6 +305,7 @@
<module>external/storm-opentsdb</module>
<module>external/storm-kafka-monitor</module>
<module>external/storm-jms</module>
+ <module>external/storm-beam</module>
<!-- examples -->
<module>examples/storm-starter</module>