STORM-2174: Initial commit for beam runner.
diff --git a/external/storm-beam/pom.xml b/external/storm-beam/pom.xml
new file mode 100644
index 0000000..4654005
--- /dev/null
+++ b/external/storm-beam/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>1.1.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-beam</artifactId>
+    <packaging>jar</packaging>
+
+
+    <name>Apache Storm Beam Runner</name>
+    <properties>
+        <beam.version>0.2.0-incubating-SNAPSHOT</beam.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-core</artifactId>
+            <version>${beam.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-core-java</artifactId>
+            <version>${beam.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-runners-direct-java</artifactId>
+            <version>${beam.version}</version>
+            <scope>runtime</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <version>1.0-rc2</version>
+            <optional>true</optional>
+        </dependency>
+    </dependencies>
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java
new file mode 100644
index 0000000..1c87e54
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/RandomSentenceSource.java
@@ -0,0 +1,109 @@
+package org.apache.storm.beam;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by tgoetz on 7/28/16.
+ */
+public class RandomSentenceSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
+
+    private final Coder<String> coder;
+
+    public RandomSentenceSource(Coder<String> coder){
+        this.coder = coder;
+    }
+
+    @Override
+    public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(int i, PipelineOptions pipelineOptions) throws Exception {
+        return Collections.singletonList(this);
+    }
+
+    @Override
+    public UnboundedReader<String> createReader(PipelineOptions pipelineOptions, @Nullable CheckpointMark checkpointMark) throws IOException {
+        return new RandomSentenceReader(this);
+    }
+
+    @Nullable
+    @Override
+    public Coder<CheckpointMark> getCheckpointMarkCoder() {
+        return null;
+    }
+
+    @Override
+    public void validate() {
+
+    }
+
+    @Override
+    public Coder<String> getDefaultOutputCoder() {
+        return this.coder;
+    }
+
+
+
+    public static class RandomSentenceReader extends UnboundedReader<String> {
+
+        private String[] values = {"blah blah blah", "foo bar", "my dog has fleas"};
+        private int index = 0;
+        private final UnboundedSource<String, CheckpointMark> source;
+
+        public RandomSentenceReader(UnboundedSource<String, CheckpointMark> source){
+            this.source = source;
+        }
+
+
+        @Override
+        public boolean start() throws IOException {
+            index = 0;
+            return true;
+        }
+
+        @Override
+        public boolean advance() throws IOException {
+            index++;
+            if(index == values.length){
+                index = 0;
+            }
+            return true;
+        }
+
+        @Override
+        public Instant getWatermark() {
+            return Instant.now();
+        }
+
+        @Override
+        public CheckpointMark getCheckpointMark() {
+            return null;
+        }
+
+        @Override
+        public UnboundedSource<String, ?> getCurrentSource() {
+            return this.source;
+        }
+
+        @Override
+        public String getCurrent() throws NoSuchElementException {
+            return values[index];
+        }
+
+        @Override
+        public Instant getCurrentTimestamp() throws NoSuchElementException {
+            return Instant.now();
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java
new file mode 100644
index 0000000..8c710ab
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormPipelineOptions.java
@@ -0,0 +1,12 @@
+package org.apache.storm.beam;
+
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Created by tgoetz on 7/27/16.
+ */
+public interface StormPipelineOptions extends PipelineOptions, ApplicationNameOptions {
+
+
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java
new file mode 100644
index 0000000..0e10271
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRegistrar.java
@@ -0,0 +1,31 @@
+package org.apache.storm.beam;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+public class StormRegistrar {
+    private StormRegistrar(){}
+
+    @AutoService(PipelineRunnerRegistrar.class)
+    public static class Runner implements PipelineRunnerRegistrar {
+        @Override
+        public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+            return ImmutableList.<Class<? extends PipelineRunner<?>>>of(
+                    StormRunner.class);
+        }
+    }
+
+    @AutoService(PipelineOptionsRegistrar.class)
+    public static class Options implements PipelineOptionsRegistrar {
+        @Override
+        public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+            return ImmutableList.<Class<? extends PipelineOptions>>of(
+                    StormPipelineOptions.class);
+        }
+    }
+
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java
new file mode 100644
index 0000000..cc92349
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormRunner.java
@@ -0,0 +1,170 @@
+package org.apache.storm.beam;
+
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.beam.translation.StormPipelineTranslator;
+import org.apache.storm.beam.translation.TranslationContext;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.*;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Main entry point into the Storm Runner.
+ * 
+ * After reading the user defined pipeline, Beam will invoke the run() method with a representation
+ * of the pipeline.
+ * 
+ * TODO: Only supports storm local mode for now.
+ */
+public class StormRunner extends PipelineRunner<StormRunner.StormPipelineResult> {
+    private static final Logger LOG = LoggerFactory.getLogger(StormRunner.class);
+    
+    private StormPipelineOptions options;
+
+    public StormRunner(StormPipelineOptions options){
+        this.options = options;
+    }
+
+    public static StormRunner fromOptions(PipelineOptions options){
+        StormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(StormPipelineOptions.class, options);
+        return new StormRunner(pipelineOptions);
+
+    }
+
+    @Override
+    public StormPipelineResult run(Pipeline pipeline) {
+        LOG.info("Running pipeline...");
+        TranslationContext context = new TranslationContext(this.options);
+        StormPipelineTranslator transformer = new StormPipelineTranslator(context);
+        transformer.translate(pipeline);
+
+        for(TranslationContext.Stream stream : context.getStreams()){
+            LOG.info(stream.getFrom() + " --> " + stream.getTo());
+        }
+
+        runTopologyLocal(getTopology(context));
+        return null;
+    }
+
+    private void runTopologyLocal(StormTopology topology){
+        Config conf = new Config();
+        conf.setMaxSpoutPending(1000);
+//        conf.setDebug(true);
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("word-count", conf, topology);
+
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+//        cluster.shutdown();
+    }
+
+    public static class StormPipelineResult implements PipelineResult {
+        private State state;
+
+        public State getState() {
+            return this.state;
+        }
+
+        public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
+            return null;
+        }
+    }
+
+    private StormTopology getTopology(TranslationContext context){
+        TopologyBuilder builder = new TopologyBuilder();
+        Map<String, IRichSpout> spouts = context.getSpouts();
+        for(String id : spouts.keySet()){
+            builder.setSpout(id, spouts.get(id));
+        }
+
+        HashMap<String, BoltDeclarer> declarers = new HashMap<String, BoltDeclarer>();
+        for (TranslationContext.Stream stream : context.getStreams()) {
+            Object boltObj = context.getBolt(stream.getTo());
+            BoltDeclarer declarer = declarers.get(stream.getTo());
+            if (boltObj instanceof IRichBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(stream.getTo(),
+                            (IRichBolt) boltObj);
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else if (boltObj instanceof IBasicBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(
+                            stream.getTo(),
+                            (IBasicBolt) boltObj);
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else if (boltObj instanceof IWindowedBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(
+                            stream.getTo(),
+                            (IWindowedBolt) boltObj);
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else if (boltObj instanceof IStatefulBolt) {
+                if(declarer == null) {
+                    declarer = builder.setBolt(
+                            stream.getTo(),
+                            (IStatefulBolt) boltObj);
+                    declarers.put(stream.getTo(), declarer);
+                }
+            } else {
+                throw new IllegalArgumentException("Class does not appear to be a bolt: " +
+                        boltObj.getClass().getName());
+            }
+
+            TranslationContext.Grouping grouping = stream.getGrouping();
+            // if the streamId is defined, use it for the grouping, otherwise assume storm's default stream
+            String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId());
+
+
+            switch (grouping.getType()) {
+                case SHUFFLE:
+                    declarer.shuffleGrouping(stream.getFrom(), streamId);
+                    break;
+                case FIELDS:
+                    //TODO check for null grouping args
+                    declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs()));
+                    break;
+                case ALL:
+                    declarer.allGrouping(stream.getFrom(), streamId);
+                    break;
+                case DIRECT:
+                    declarer.directGrouping(stream.getFrom(), streamId);
+                    break;
+                case GLOBAL:
+                    declarer.globalGrouping(stream.getFrom(), streamId);
+                    break;
+                case LOCAL_OR_SHUFFLE:
+                    declarer.localOrShuffleGrouping(stream.getFrom(), streamId);
+                    break;
+                case NONE:
+                    declarer.noneGrouping(stream.getFrom(), streamId);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+            }
+        }
+
+        return builder.createTopology();
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java b/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java
new file mode 100644
index 0000000..b98c1aa
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/StormWordCount.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * A minimal word count pipeline using the Beam API, running on top of Storm
+ *
+ * When the Storm Runner is reasonably complete, running this pipline in Storm
+ * should yield that same output as running it on the Beam DirectRunner
+ *
+ */
+public class StormWordCount {
+
+    static class ExtractWordsFn extends DoFn<String, String> {
+        private final Aggregator<Long, Long> emptyLines =
+                createAggregator("emptyLines", new Sum.SumLongFn());
+
+        @Override
+        public void processElement(ProcessContext c) {
+            if (c.element().trim().isEmpty()) {
+                emptyLines.addValue(1L);
+            }
+
+            // Split the line into words.
+            String[] words = c.element().split("[^a-zA-Z']+");
+
+            // Output each word encountered into the output PCollection.
+            for (String word : words) {
+                if (!word.isEmpty()) {
+                    System.out.println(word);
+                    c.output(word);
+                }
+            }
+        }
+    }
+
+    /**
+     * A SimpleFunction that converts a Word and Count into a printable string.
+     */
+    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
+        @Override
+        public String apply(KV<String, Long> input) {
+            String retval = input.getKey() + ": " + input.getValue();
+            System.out.println(retval);
+            return retval;
+        }
+    }
+
+    /**
+     * A PTransform that converts a PCollection containing lines of text into a PCollection of
+     * formatted word counts.
+     */
+    public static class CountWords extends PTransform<PCollection<String>,
+            PCollection<KV<String, Long>>> {
+        @Override
+        public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
+
+            // Convert lines of text into individual words.
+            PCollection<String> words = lines.apply(
+                    ParDo.of(new ExtractWordsFn()));
+
+            // Count the number of times each word occurs.
+            PCollection<KV<String, Long>> wordCounts =
+                    words.apply(Count.<String>perElement());
+
+            return wordCounts;
+        }
+    }
+
+    /**
+     * Options supported by {@link StormWordCount}.
+     * <p>
+     * <p>Inherits standard configuration options.
+     */
+    public interface WordCountOptions extends PipelineOptions {
+
+    }
+
+    public static void main(String[] args) {
+        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
+                .as(WordCountOptions.class);
+        Pipeline p = Pipeline.create(options);
+        p.apply("Spout", Read.from(new RandomSentenceSource(StringUtf8Coder.of())))
+                .apply("Window", Window.<String>into(FixedWindows.of(Duration.standardSeconds(2))))
+                .apply("ExtractWords", ParDo.of(new ExtractWordsFn()))
+                .apply(new CountWords());
+
+        p.run();
+
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..455d8a2
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/GroupByKeyTranslator.java
@@ -0,0 +1,58 @@
+package org.apache.storm.beam.translation;
+
+import avro.shaded.com.google.common.collect.Lists;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.storm.beam.translation.runtime.GroupByKeyCompleteBolt;
+import org.apache.storm.beam.translation.runtime.GroupByKeyInitBolt;
+
+import java.util.List;
+
+/**
+ * Translates a Beam GroupByKey operation into a pair of Storm Bolts with a fields grouping.
+ *
+ * TODO: From a Beam perspective this is likely the wrong approach to doing GBK
+ */
+public class GroupByKeyTranslator<K, V> implements
+        TransformTranslator<GroupByKey<K, V>> {
+    @Override
+    public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+        PValue pvFrom = (PValue)context.getCurrentTransform().getInput();
+
+        PValue pvTo = (PValue)context.getCurrentTransform().getEnclosingNode().getOutput();
+
+        String from = baseName(pvFrom.getName());
+        String to = baseName(pvTo.getName());
+        context.activateGBK(to);
+        String initBolt = from + "_GBK_init"; // first GBK bolt
+        String completeBolt = from + "_GBK_complete";
+
+        GroupByKeyInitBolt gbkInit = new GroupByKeyInitBolt();
+        GroupByKeyCompleteBolt gbkComplete = new GroupByKeyCompleteBolt();
+
+
+        // from --> initBolt
+        TranslationContext.Stream stream = new TranslationContext.Stream(from, initBolt, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+        context.addStream(stream);
+        context.addBolt(initBolt, gbkInit);
+
+        // initBolt --> completeBolt
+        TranslationContext.Grouping fieldsGrouping = new TranslationContext.Grouping(TranslationContext.Grouping.Type.FIELDS);
+        List fields = Lists.newArrayList();
+        fields.add("keyValue");
+        fieldsGrouping.setArgs(fields);
+        context.addBolt(completeBolt, gbkComplete);
+        stream = new TranslationContext.Stream(initBolt, completeBolt, fieldsGrouping);
+        context.addStream(stream);
+
+        // completeBolt --> to
+        stream = new TranslationContext.Stream(completeBolt, to, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+        context.addStream(stream);
+    }
+
+
+    private static String baseName(String str){
+        return str.substring(0, str.lastIndexOf("."));
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java
new file mode 100644
index 0000000..671b84d
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/ParDoBoundTranslator.java
@@ -0,0 +1,51 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.storm.beam.translation.runtime.DoFnBolt;
+import org.apache.storm.beam.translation.util.DefaultSideInputReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a ParDo.Bound to a Storm DoFnBolt
+ */
+public class ParDoBoundTranslator<InputT, OutputT> implements
+        TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+
+    @Override
+    public void translateNode(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
+        DoFn<InputT, OutputT> doFn = transform.getFn();
+        PCollection<OutputT> output = context.getOutput();
+        WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
+
+        DoFnBolt<InputT, OutputT> bolt = new DoFnBolt<>(context.getOptions(), doFn,
+                windowingStrategy, new DefaultSideInputReader());
+
+        PValue pvFrom = (PValue)context.getCurrentTransform().getInput();
+        String from = baseName(pvFrom.getName());
+        if(context.isGBKActive()){
+            from = context.completeGBK();
+        }
+        LOG.info(baseName(pvFrom.getName()));
+
+        PValue pvTo = (PValue)context.getCurrentTransform().getOutput();
+        LOG.info(baseName(pvTo.getName()));
+        String to = baseName(pvTo.getName());
+
+        TranslationContext.Stream stream = new TranslationContext.Stream(from, to, new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+
+        context.addStream(stream);
+        context.addBolt(baseName(pvTo.getName()), bolt);
+    }
+
+    private static String baseName(String str){
+        return str.substring(0, str.lastIndexOf("."));
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java
new file mode 100644
index 0000000..d3dd478
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/StormPipelineTranslator.java
@@ -0,0 +1,51 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class StormPipelineTranslator implements Pipeline.PipelineVisitor{
+    private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
+    private TranslationContext context;
+
+    public StormPipelineTranslator(TranslationContext context){
+        this.context = context;
+    }
+
+
+    public void translate(Pipeline pipeline) {
+        pipeline.traverseTopologically(this);
+    }
+
+    public CompositeBehavior enterCompositeTransform(TransformTreeNode transformTreeNode) {
+        LOG.info("entering composite translation {}", transformTreeNode.getTransform());
+        return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    public void leaveCompositeTransform(TransformTreeNode transformTreeNode) {
+        LOG.info("leaving composite translation {}", transformTreeNode.getTransform());
+    }
+
+    public void visitPrimitiveTransform(TransformTreeNode transformTreeNode) {
+        LOG.info("visiting transform {}", transformTreeNode.getTransform());
+        PTransform transform = transformTreeNode.getTransform();
+        LOG.info("class: {}", transform.getClass());
+        TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+        if(translator != null) {
+            context.setCurrentTransform(transformTreeNode);
+            translator.translateNode(transformTreeNode.getTransform(), context);
+        } else {
+            LOG.warn("No translator found for {}", transform.getClass());
+        }
+    }
+
+    public void visitValue(PValue value, TransformTreeNode transformTreeNode) {
+        LOG.info("visiting value {}", value);
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java
new file mode 100644
index 0000000..cb31060
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TransformTranslator.java
@@ -0,0 +1,11 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+public interface TransformTranslator <Type extends PTransform>{
+
+        void translateNode(Type transform, TranslationContext context);
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java
new file mode 100644
index 0000000..b3920c0
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslationContext.java
@@ -0,0 +1,186 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.topology.IRichSpout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Maintains the state necessary during Pipeline translation to build a Storm topology.
+ */
+public class TranslationContext {
+    private StormPipelineOptions options;
+
+    private TransformTreeNode currentTransform;
+
+    private Map<String, IRichSpout> spoutMap = new HashMap<String, IRichSpout>();
+
+    private Map<String, Object> boltMap = new HashMap<String, Object>();
+
+    private List<Stream> streams = new ArrayList<Stream>();
+
+    public TranslationContext(StormPipelineOptions options){
+        this.options = options;
+
+    }
+
+    private String gbkTo = null;
+
+    public StormPipelineOptions getOptions(){
+        return this.options;
+    }
+
+    public void addSpout(String id, IRichSpout spout){
+        this.spoutMap.put(id, spout);
+    }
+
+    public Map<String, IRichSpout> getSpouts(){
+        return this.spoutMap;
+    }
+
+    public void addBolt(String id, Object bolt){
+        this.boltMap.put(id, bolt);
+    }
+
+    public Object getBolt(String id){
+        return this.boltMap.get(id);
+    }
+
+    public void addStream(Stream stream){
+        this.streams.add(stream);
+    }
+    public List<Stream> getStreams(){
+        return this.streams;
+    }
+
+    public void setCurrentTransform(TransformTreeNode transform){
+        this.currentTransform = transform;
+    }
+
+    public TransformTreeNode getCurrentTransform(){
+        return this.currentTransform;
+    }
+
+    public <InputT extends PInput> InputT getInput() {
+        return (InputT) getCurrentTransform().getInput();
+    }
+
+    public <OutputT extends POutput> OutputT getOutput() {
+        return (OutputT) getCurrentTransform().getOutput();
+    }
+
+    public void activateGBK(String gbkTo){
+        this.gbkTo = gbkTo;
+    }
+
+    public String completeGBK(){
+        String gbkTo = this.gbkTo;
+        this.gbkTo = null;
+        return gbkTo;
+    }
+
+    public boolean isGBKActive(){
+        return this.gbkTo != null;
+    }
+
+
+
+    public static class Stream {
+
+        private String from;
+        private String to;
+        private Grouping grouping;
+
+        public Stream(String from, String to, Grouping grouping){
+            this.from = from;
+            this.to = to;
+            this.grouping = grouping;
+        }
+
+        public String getTo() {
+            return to;
+        }
+
+        public void setTo(String to) {
+            this.to = to;
+        }
+
+        public String getFrom() {
+            return from;
+        }
+
+        public void setFrom(String from) {
+            this.from = from;
+        }
+
+        public Grouping getGrouping() {
+            return grouping;
+        }
+
+        public void setGrouping(Grouping grouping) {
+            this.grouping = grouping;
+        }
+    }
+
+    public static class Grouping {
+
+        /**
+         * Types of stream groupings Storm allows
+         */
+        public static enum Type {
+            ALL,
+            CUSTOM,
+            DIRECT,
+            SHUFFLE,
+            LOCAL_OR_SHUFFLE,
+            FIELDS,
+            GLOBAL,
+            NONE
+        }
+
+        private Type type;
+        private String streamId; // for named streams, other than DEFAULT
+        private List<String> args; // arguments for fields grouping
+
+
+        public Grouping(Type type){
+            this.type = type;
+        }
+
+        public Grouping(List<String> args){
+            this.type = Type.FIELDS;
+            this.args = args;
+        }
+        public List<String> getArgs() {
+            return args;
+        }
+
+        public void setArgs(List<String> args) {
+            this.args = args;
+        }
+
+        public Type getType() {
+            return type;
+        }
+
+        public void setType(Type type) {
+            this.type = type;
+        }
+
+        public String getStreamId() {
+            return streamId;
+        }
+
+        public void setStreamId(String streamId) {
+            this.streamId = streamId;
+        }
+
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java
new file mode 100644
index 0000000..bdb42f8
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/TranslatorRegistry.java
@@ -0,0 +1,29 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Lookup table mapping PTransform types to associated TransformTranslator implementations.
+ */
+public class TranslatorRegistry {
+    private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap();
+
+    static {
+        TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
+        TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
+        TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslator<>());
+        TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator<>());
+    }
+
+    static TransformTranslator<?> getTranslator(
+            PTransform<?, ?> transform) {
+        return TRANSLATORS.get(transform.getClass());
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java
new file mode 100644
index 0000000..c177a89
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/UnboundedSourceTranslator.java
@@ -0,0 +1,21 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.beam.translation.runtime.UnboundedSourceSpout;
+
+/**
+ * Translates a Read.Unbounded into a Storm spout.
+ * @param <T>
+ */
+public class UnboundedSourceTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+    public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+        UnboundedSource source = transform.getSource();
+        StormPipelineOptions options = context.getOptions();
+        UnboundedSourceSpout spout = new UnboundedSourceSpout(source, options);
+
+        String name = context.getCurrentTransform().getFullName();
+        context.addSpout(name, spout);
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java
new file mode 100644
index 0000000..be6d1f9
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/WindowBoundTranslator.java
@@ -0,0 +1,47 @@
+package org.apache.storm.beam.translation;
+
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.storm.beam.translation.runtime.WindowBolt;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a Window.Bound node into a Storm WindowedBolt
+ * @param <T>
+ */
+public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
+
+    @Override
+    public void translateNode(Window.Bound<T> transform, TranslationContext context) {
+        if(transform.getWindowFn() instanceof FixedWindows){
+            Duration size = ((FixedWindows) transform.getWindowFn()).getSize();
+
+            WindowBolt bolt = new WindowBolt();
+            bolt.withTumblingWindow(WindowBolt.Duration.seconds((int)size.getStandardSeconds()));
+
+            PValue from = (PValue)context.getCurrentTransform().getInput();
+            LOG.info(baseName(from.getName()));
+
+            PValue to = (PValue)context.getCurrentTransform().getOutput();
+            LOG.info(baseName(to.getName()));
+
+            TranslationContext.Stream stream = new TranslationContext.Stream(baseName(from.getName()), baseName(to.getName()), new TranslationContext.Grouping(TranslationContext.Grouping.Type.SHUFFLE));
+
+            context.addStream(stream);
+            context.addBolt(baseName(to.getName()), bolt);
+
+        } else {
+            throw new UnsupportedOperationException("Currently only fixed windows are supported.");
+        }
+    }
+
+
+    private static String baseName(String str){
+        return str.substring(0, str.lastIndexOf("."));
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java
new file mode 100644
index 0000000..3a166e7
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/DoFnBolt.java
@@ -0,0 +1,102 @@
+package org.apache.storm.beam.translation.runtime;
+
+import com.google.api.client.util.Lists;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.*;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.beam.translation.util.DefaultStepContext;
+import org.apache.storm.beam.util.SerializedPipelineOptions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.*;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by tgoetz on 8/2/16.
+ */
+public class DoFnBolt<InputT, OutputT> extends BaseRichBolt implements DoFnRunners.OutputManager{
+
+    private transient DoFnRunner<InputT, OutputT> runner = null;
+
+    private final TupleTag<OutputT> tupleTag = new TupleTag<OutputT>() {};
+
+    private transient OutputCollector collector;
+
+    private List<WindowedValue<OutputT>> output = Lists.newArrayList();
+
+    private SerializedPipelineOptions serializedOptions;
+    private transient StormPipelineOptions pipelineOptions;
+
+    private DoFn<InputT, OutputT> doFn;
+    private WindowingStrategy<?, ?> windowingStrategy;
+    private SideInputReader sideInputReader;
+
+    public DoFnBolt(
+            StormPipelineOptions pipelineOptions,
+            DoFn<InputT, OutputT> doFn,
+            WindowingStrategy<?, ?> windowingStrategy,
+            SideInputReader sideInputReader){
+        this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+        this.doFn = doFn;
+        this.windowingStrategy = windowingStrategy;
+        this.sideInputReader = sideInputReader;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+
+        Counter<Integer> counter = Counter.ints("foo", Counter.AggregationKind.SUM);
+        CounterSet counters = new CounterSet(counter);
+
+        this.runner = new StormDoFnRunner(this.pipelineOptions, this.doFn, this.sideInputReader, this, this.tupleTag, TupleTagList.empty().getAll(), new DefaultStepContext(), counters.getAddCounterMutator(), this.windowingStrategy);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        System.out.println("Type: " + input.getValue(0).getClass());
+        Object value = input.getValue(0);
+        this.output = Lists.newArrayList();
+        this.runner.startBundle();
+        if(value instanceof List){
+            for(Object o : ((List)value)){
+                this.runner.processElement((WindowedValue)o);
+            }
+
+        } else {
+            this.runner.processElement((WindowedValue) input.getValue(0));
+        }
+        this.runner.finishBundle();
+
+//        for(WindowedValue val : this.output){
+//            this.collector.emit(input, new Values(val));
+//        }
+        this.collector.emit(new Values(this.output));
+        this.collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("value"));
+    }
+
+
+    @Override
+    public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
+        if(this.tupleTag.equals(tupleTag)){
+            this.output.add((WindowedValue<OutputT>)windowedValue);
+        } else {
+            throw new RuntimeException("Wrong tag");
+        }
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java
new file mode 100644
index 0000000..d541709
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyCompleteBolt.java
@@ -0,0 +1,40 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class GroupByKeyCompleteBolt extends BaseRichBolt {
+    private OutputCollector collector;
+
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    public GroupByKeyCompleteBolt() {
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        this.collector.emit(input, new Values(input.getValueByField("windowedValue")));
+        this.collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("value"));
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java
new file mode 100644
index 0000000..aaba57e
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/GroupByKeyInitBolt.java
@@ -0,0 +1,47 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class GroupByKeyInitBolt extends BaseRichBolt {
+    private OutputCollector collector;
+
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    public GroupByKeyInitBolt() {
+    }
+
+    @Override
+    public void execute(Tuple input) {
+
+        List<WindowedValue<KV>> values = (List<WindowedValue<KV>>)input.getValue(0);
+        for(WindowedValue<KV> value : values) {
+            KV kv = value.getValue();
+            Object key = kv.getKey();
+            this.collector.emit(input, new Values(key, value));
+            this.collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("keyValue", "windowedValue"));
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java
new file mode 100644
index 0000000..632fdbf
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/StormDoFnRunner.java
@@ -0,0 +1,27 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.*;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ *
+ */
+public class StormDoFnRunner extends SimpleDoFnRunner implements Serializable {
+    public StormDoFnRunner(PipelineOptions options,
+                           DoFn fn,
+                           SideInputReader sideInputReader,
+                           DoFnRunners.OutputManager outputManager,
+                           TupleTag mainOutputTag, List sideOutputTags,
+                           ExecutionContext.StepContext stepContext,
+                           CounterSet.AddCounterMutator addCounterMutator,
+                           WindowingStrategy windowingStrategy) {
+        super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
+                addCounterMutator, windowingStrategy);
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java
new file mode 100644
index 0000000..349576e
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/UnboundedSourceSpout.java
@@ -0,0 +1,109 @@
+package org.apache.storm.beam.translation.runtime;
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.storm.beam.StormPipelineOptions;
+import org.apache.storm.beam.util.SerializedPipelineOptions;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Spout implementation that wraps a Beam UnboundedSource
+ */
+public class UnboundedSourceSpout extends BaseRichSpout{
+    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+    private UnboundedSource source;
+    private transient UnboundedSource.UnboundedReader reader;
+    private SerializedPipelineOptions serializedOptions;
+    private transient StormPipelineOptions pipelineOptions;
+
+    private SpoutOutputCollector collector;
+
+    public UnboundedSourceSpout(UnboundedSource source, StormPipelineOptions options){
+        this.source = source;
+        this.serializedOptions = new SerializedPipelineOptions(options);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        try {
+            this.reader.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void activate() {
+        super.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        super.deactivate();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        super.ack(msgId);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        super.fail(msgId);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return super.getComponentConfiguration();
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("value"));
+
+    }
+
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        try {
+
+            this.collector = collector;
+            this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+            this.reader = this.source.createReader(this.pipelineOptions, null);
+            this.reader.start();
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to create unbounded reader.", e);
+        }
+
+    }
+
+    public void nextTuple() {
+        try {
+            if(this.reader.advance()){
+                Object value = reader.getCurrent();
+                Instant timestamp = reader.getCurrentTimestamp();
+                Instant watermark = reader.getWatermark();
+
+                WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+                collector.emit(new Values(wv), UUID.randomUUID());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Exception reading values from source.", e);
+        }
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java
new file mode 100644
index 0000000..4e8a7fb
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/runtime/WindowBolt.java
@@ -0,0 +1,48 @@
+package org.apache.storm.beam.translation.runtime;
+
+import com.google.common.collect.Lists;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class WindowBolt extends BaseWindowedBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(WindowBolt.class);
+
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        super.prepare(stormConf, context, collector);
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(TupleWindow inputWindow) {
+        LOG.info("*******************Executing window with size {}", inputWindow.get().size());
+        LOG.info("Type: {}", inputWindow.get().getClass());
+        List values = Lists.newArrayList();
+        for(Tuple t :inputWindow.get()){
+            values.add(t.getValue(0));
+        }
+        collector.emit(inputWindow.get(), new Values(values));
+
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("window"));
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java
new file mode 100644
index 0000000..bea9e8d
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultSideInputReader.java
@@ -0,0 +1,29 @@
+package org.apache.storm.beam.translation.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * No-op SideInputReader implementation.
+ */
+public class DefaultSideInputReader implements SideInputReader, Serializable {
+    @Nullable
+    @Override
+    public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
+        return null;
+    }
+
+    @Override
+    public <T> boolean contains(PCollectionView<T> pCollectionView) {
+        return false;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return true;
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java
new file mode 100644
index 0000000..ffdd19b
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/translation/util/DefaultStepContext.java
@@ -0,0 +1,51 @@
+package org.apache.storm.beam.translation.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.io.IOException;
+
+/**
+ * No-op StepContext implementation.
+ */
+public class DefaultStepContext implements ExecutionContext.StepContext {
+    @Override
+    public String getStepName() {
+        return null;
+    }
+
+    @Override
+    public String getTransformName() {
+        return null;
+    }
+
+    @Override
+    public void noteOutput(WindowedValue<?> windowedValue) {
+
+    }
+
+    @Override
+    public void noteSideOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+
+    }
+
+    @Override
+    public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<Iterable<WindowedValue<T>>> coder, W w, Coder<W> coder1) throws IOException {
+
+    }
+
+    @Override
+    public StateInternals<?> stateInternals() {
+        return null;
+    }
+
+    @Override
+    public TimerInternals timerInternals() {
+        return null;
+    }
+}
diff --git a/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java b/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java
new file mode 100644
index 0000000..8b14fed
--- /dev/null
+++ b/external/storm-beam/src/main/java/org/apache/storm/beam/util/SerializedPipelineOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.beam.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+  private final byte[] serializedOptions;
+
+  /** Lazily initialized copy of deserialized options */
+  private transient PipelineOptions pipelineOptions;
+
+  public SerializedPipelineOptions(PipelineOptions options) {
+    checkNotNull(options, "PipelineOptions must not be null.");
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      new ObjectMapper().writeValue(baos, options);
+      this.serializedOptions = baos.toByteArray();
+    } catch (Exception e) {
+      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+    }
+
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    if (pipelineOptions == null) {
+      try {
+        pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+      } catch (IOException e) {
+        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+      }
+    }
+
+    return pipelineOptions;
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index a2aec4d..8242319 100644
--- a/pom.xml
+++ b/pom.xml
@@ -305,6 +305,7 @@
         <module>external/storm-opentsdb</module>
         <module>external/storm-kafka-monitor</module>
         <module>external/storm-jms</module>
+        <module>external/storm-beam</module>
 
         <!-- examples -->
         <module>examples/storm-starter</module>