Merge pull request #57

This closes #57
diff --git a/site/_data/mydoc/mydoc_sidebar.yml b/site/_data/mydoc/mydoc_sidebar.yml
index 9d770ab..f43d78a 100644
--- a/site/_data/mydoc/mydoc_sidebar.yml
+++ b/site/_data/mydoc/mydoc_sidebar.yml
@@ -162,6 +162,22 @@
       version: all
       output: web, pdf
 
+    - title: How can I run analytics on several tuples in parallel?
+      url: /recipes/recipe_parallel_analytics.html
+      audience: writers, designers
+      platform: all
+      product: all
+      version: all
+      output: web, pdf
+
+    - title: How can I run several analytics on a tuple concurrently?
+      url: /recipes/recipe_concurrent_analytics.html
+      audience: writers, designers
+      platform: all
+      product: all
+      version: all
+      output: web, pdf
+
   - title: Sample Programs
     audience: writers, designers
     platform: all
diff --git a/site/recipes/recipe_concurrent_analytics.md b/site/recipes/recipe_concurrent_analytics.md
new file mode 100644
index 0000000..57d6aaa
--- /dev/null
+++ b/site/recipes/recipe_concurrent_analytics.md
@@ -0,0 +1,195 @@
+---
+title: How can I run several analytics on a tuple concurrently?
+---
+
+If you have several independent lengthy analytics to perform on each tuple, you may determine that it would be advantageous to perform the analytics concurrently and then combine their results.
+
+The overall proessing time for a single tuple is then roughly that of the slowest analytic pipeline instead of the aggregate of each analytic pipeline.
+
+This usage model is in contrast to what's often referred to as _parallel_ tuple processing where several tuples are processed in parallel in replicated pipeline channels.
+
+e.g., for independent analytic pipelines A1, A2, and A3, you want to change the serial processing flow graph from:
+
+```
+sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+```
+
+to a flow where the analytics run concurrently in a flow like:
+
+```
+                     |-> A1 ->|
+sensorReadings<T> -> |-> A2 ->| -> results<R>
+                     |-> A3 ->|
+```
+
+The key to the above flow is to use a _barrier_ to synchronize the results from each of the pipelines so they can be combined into a single result tuple.  Each of the concurrent channels also needs a thread to run its analytic pipeline.
+
+`PlumbingStreams.concurrent()` builds a concurrent flow graph for you.  Alternatively, you can use `PlumbingStreams.barrier()` and `PlumbingStreams.isolate()` and build a concurrent flow graph yourself.
+
+More specifically `concurrent()` generates a flow like:
+
+```
+          |-> isolate(1) -> pipeline1 -> |
+stream -> |-> isolate(1) -> pipeline2 -> |-> barrier(10) -> combiner 
+          |-> isolate(1) -> pipeline3 -> |
+```
+
+It's easy to use `concurrent()`!
+
+## Define the collection of analytic pipelines to run
+
+For the moment assume we have defined methods to create each pipeline: `a1pipeline()`, `a2pipeline()` and `a3pipeline()`. In this simple recipe each pipeline receives a `TStream<Double>` as input and generates a `TStream<String>` as output.
+
+```java
+List<Function<TStream<Double>, TStream<String>>> pipelines = new ArrayList<>();
+pipelines.add(a1pipeline());
+pipelines.add(a2pipeline());
+pipelines.add(a3pipeline());
+```
+
+## Define the result combiner
+
+Each pipeline creates one result tuple for each input tuple.  The `barrier` collects one tuple from each pipeline and then creates a list of those tuples. The combiner is invoked with that list to generate the final aggregate result tuple.
+
+In this recipe the combiner is a simple lambda function that returns the input list:
+
+```java
+Function<List<String>, List<String>> combiner = list -> list;
+```
+
+## Build the concurrent flow
+
+```java
+TStream<List<String>> results = PlumbingStreams.concurrent(readings, pipelines, combiner);
+```
+
+## Define your analytic pipelines
+
+For each analytic pipeline, define a `Function<TStream<T>, TStream<U>>` that will create the pipeline.  That is, define a function that takes a `TStream<T>` as its input and yields a `TStream<U>` as its result.  Of course, `U` can be the same type as `T`.
+
+In this recipe we'll just define some very simple pipelines and use sleep to simulate some long processing times.
+
+Here's the A3 pipeline builder:
+
+```java
+static Function<TStream<Double>,TStream<String>> a3pipeline() {
+    // simple 3 stage pipeline simulating some amount of work by sleeping
+    return stream -> stream.map(tuple -> {
+        sleep(800, TimeUnit.MILLISECONDS);
+        return "This is the a3pipeline result for tuple "+tuple;
+      }).tag("a3.stage1")
+      .map(Functions.identity()).tag("a3.stage2")
+      .map(Functions.identity()).tag("a3.stage3");
+}
+```
+
+## The final application
+
+When the application is run it prints out an aggregate result (a list of one tuple from each pipeline) every second. If the three pipelines were run serially, it would take on the order of 2.4 seconds to generate each aggregate result.
+
+```java
+package quarks.samples.topology;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import quarks.console.server.HttpServer;
+import quarks.function.Function;
+import quarks.function.Functions;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.providers.direct.DirectProvider;
+import quarks.samples.utils.sensor.SimpleSimulatedSensor;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.plumbing.PlumbingStreams;
+
+/**
+ * A recipe for concurrent analytics.
+ */
+public class ConcurrentRecipe {
+
+    /**
+     * Concurrently run a collection of long running independent
+     * analytic pipelines on each tuple.
+     */
+    public static void main(String[] args) throws Exception {
+
+        DirectProvider dp = new DevelopmentProvider();
+        System.out.println("development console url: "
+                + dp.getServices().getService(HttpServer.class).getConsoleUrl());
+
+        Topology top = dp.newTopology("ConcurrentRecipe");
+        
+        // Define the list of independent unique analytic pipelines to include
+        List<Function<TStream<Double>,TStream<String>>> pipelines = new ArrayList<>();
+        pipelines.add(a1pipeline());
+        pipelines.add(a2pipeline());
+        pipelines.add(a3pipeline());
+        
+        // Define the result combiner function.  The combiner receives 
+        // a tuple containing a list of tuples, one from each pipeline, 
+        // and returns a result tuple of any type from them.
+        // In this recipe we'll just return the list.
+        Function<List<String>,List<String>> combiner = list -> list;
+        
+        // Generate a polled simulated sensor stream
+        SimpleSimulatedSensor sensor = new SimpleSimulatedSensor();
+        TStream<Double> readings = top.poll(sensor, 1, TimeUnit.SECONDS)
+                                      .tag("readings");
+        
+        // Build the concurrent analytic pipeline flow
+        TStream<List<String>> results = 
+            PlumbingStreams.concurrent(readings, pipelines, combiner)
+            .tag("results");
+        
+        // Print out the results.
+        results.sink(list -> System.out.println(new Date().toString() + " results tuple: " + list));
+
+        System.out.println("Notice how an aggregate result is generated every second."
+            + "\nEach aggregate result would take 2.4sec if performed serially.");
+        dp.submit(top);
+    }
+    
+    /** Function to create analytic pipeline a1 and add it to a stream */
+    private static Function<TStream<Double>,TStream<String>> a1pipeline() {
+        // a simple 1 stage pipeline simulating some amount of work by sleeping
+        return stream -> stream.map(tuple -> {
+            sleep(800, TimeUnit.MILLISECONDS);
+            return "This is the a1pipeline result for tuple "+tuple;
+          }).tag("a1.stage1");
+    }
+    
+    /** Function to create analytic pipeline a2 and add it to a stream */
+    private static Function<TStream<Double>,TStream<String>> a2pipeline() {
+        // a simple 2 stage pipeline simulating some amount of work by sleeping
+        return stream -> stream.map(tuple -> {
+            sleep(800, TimeUnit.MILLISECONDS);
+            return "This is the a2pipeline result for tuple "+tuple;
+          }).tag("a2.stage1")
+          .map(Functions.identity()).tag("a2.stage2");
+    }
+    
+    /** Function to create analytic pipeline a3 and add it to a stream */
+    private static Function<TStream<Double>,TStream<String>> a3pipeline() {
+        // a simple 3 stage pipeline simulating some amount of work by sleeping
+        return stream -> stream.map(tuple -> {
+            sleep(800, TimeUnit.MILLISECONDS);
+            return "This is the a3pipeline result for tuple "+tuple;
+          }).tag("a3.stage1")
+          .map(Functions.identity()).tag("a3.stage2")
+          .map(Functions.identity()).tag("a3.stage3");
+    }
+
+    private static void sleep(long period, TimeUnit unit) throws RuntimeException {
+        try {
+            Thread.sleep(unit.toMillis(period));
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted", e);
+        }
+    }
+
+}
+
+```
diff --git a/site/recipes/recipe_parallel_analytics.md b/site/recipes/recipe_parallel_analytics.md
new file mode 100644
index 0000000..965f455
--- /dev/null
+++ b/site/recipes/recipe_parallel_analytics.md
@@ -0,0 +1,172 @@
+---
+title: How can I run analytics on several tuples in parallel?
+---
+
+If the duration of your per-tuple analytic processing makes your application unable to keep up with the tuple ingest rate or result generation rate, you can often run analytics on several tuples in parallel to improve performance.
+
+The overall proessing time for a single tuple is still the same but the processing for each tuple is overlapped. In the extreme your application may be able to process N tuples in the same time that it would have processed one.
+
+This usage model is in contrast to what's been called _concurrent analytics_, where multiple different independent analytics for a single tuple are performed concurrently, as when using `PlumbingStreams.concurrent()`.
+
+e.g., imagine your analytic pipeline has three stages to it: A1, A2, A3, and that A2 dominates the processing time. You want to change the serial processing flow graph from:
+
+```
+sensorReadings<T> -> A1 -> A2 -> A3 -> results<R>
+```
+
+to a flow where the A2 analytics run on several tuples in parallel in a flow like:
+
+```
+                           |-> A2-channel0 ->|
+sensorReadings<T> -> A1 -> |-> A2-channel1 ->| -> A3 -> results<R>
+                           |-> A2-channel2 ->|
+                           |-> A2-channel3 ->|
+                           |-> A2-channel4 ->|
+                                  ...
+```
+
+The key to the above flow is to use a _splitter_ to distribute the tuples among the parallel channels. Each of the parallel channels also needs a thread to run its analytic pipeline.
+
+`PlumbingStreams.parallel()` builds a parallel flow graph for you. Alternatively, you can use `TStream.split()`, `PlumbingStreams.isolate()`, and `TStream.union()` and build a parallel flow graph yourself.
+
+More specifically `parallel()` generates a flow like:
+
+```
+                                   |-> isolate(10) -> pipeline-ch0 -> |
+stream -> split(width,splitter) -> |-> isolate(10) -> pipeline-ch1 -> |-> union -> isolate(width) 
+                                   |-> isolate(10) -> pipeline-ch2 -> |
+                                       ...
+```
+
+It's easy to use `parallel()`!
+
+## Define the splitter
+
+The splitter function partitions the tuples among the parallel channels. `PlumbingStreams.roundRobinSplitter()` is a commonly used splitter that simply cycles among each channel in succession. The round robin strategy works great when the processing time of tuples is uniform. Other splitter functions may use information in the tuple to decide how to partition them.
+
+This recipe just uses the round robin splitter for a `TStream<Double>`.
+
+```java
+int width = 5;  // number of parallel channels
+
+ToIntFunction<Double> splitter = PlumbingStreams.roundRobinSplitter(width);
+```
+
+## Define the pipeline to run in parallel
+
+Define a `BiFunction<TStream<T>, Integer, TStream<R>>` that builds the pipeline. That is, define a function that receives a `TStream<T>` and an integer `channel` and creates a pipeline for that channel that returns a `TStream<R>`.
+
+Many pipelines don't care what channel they're being constructed for. While the pipeline function typically yields the same pipeline processing for each channel there is no requirement for it to do so.
+
+In this simple recipe the pipeline receives a `TStream<Double>` as input and generates a `TStream<String>` as output.
+
+```java
+static BiFunction<TStream<Double>, Integer, TStream<String>> pipeline() {
+    // a simple 4 stage pipeline simulating some amount of work by sleeping
+    return (stream, channel) -> 
+      { 
+        String tagPrefix = "pipeline-ch"+channel;
+        return stream.map(tuple -> {
+            sleep(1000, TimeUnit.MILLISECONDS);
+            return "This is the "+tagPrefix+" result for tuple "+tuple;
+          }).tag(tagPrefix+".stage1")
+          .map(Functions.identity()).tag(tagPrefix+".stage2")
+          .map(Functions.identity()).tag(tagPrefix+".stage3");
+          .map(Functions.identity()).tag(tagPrefix+".stage4");
+      };
+}
+```
+
+## Build the parallel flow
+
+Given a width, splitter and pipeline function it just takes a single call:
+
+```java
+TStream<String> results = PlumbingStreams.parallel(readings, width, splitter, pipeline());
+```
+
+## The final application
+
+When the application is run it prints out 5 (width) tuples every second. Without the parallel channels, it would only print one tuple each second.
+
+```java
+package quarks.samples.topology;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import quarks.console.server.HttpServer;
+import quarks.function.BiFunction;
+import quarks.function.Functions;
+import quarks.providers.development.DevelopmentProvider;
+import quarks.providers.direct.DirectProvider;
+import quarks.samples.utils.sensor.SimpleSimulatedSensor;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.plumbing.PlumbingStreams;
+
+/**
+ * A recipe for parallel analytics.
+ */
+public class ParallelRecipe {
+
+    /**
+     * Process several tuples in parallel in a replicated pipeline.
+     */
+    public static void main(String[] args) throws Exception {
+
+        DirectProvider dp = new DevelopmentProvider();
+        System.out.println("development console url: "
+                + dp.getServices().getService(HttpServer.class).getConsoleUrl());
+
+        Topology top = dp.newTopology("ParallelRecipe");
+
+        // The number of parallel processing channels to generate
+        int width = 5;
+        
+        // Define the splitter
+        ToIntFunction<Double> splitter = PlumbingStreams.roundRobinSplitter(width);
+        
+        // Generate a polled simulated sensor stream
+        SimpleSimulatedSensor sensor = new SimpleSimulatedSensor();
+        TStream<Double> readings = top.poll(sensor, 10, TimeUnit.MILLISECONDS)
+                                      .tag("readings");
+        
+        // Build the parallel analytic pipelines flow
+        TStream<String> results = 
+            PlumbingStreams.parallel(readings, width, splitter, pipeline())
+                .tag("results");
+        
+        // Print out the results.
+        results.sink(tuple -> System.out.println(new Date().toString() + "   " + tuple));
+
+        System.out.println("Notice that "+width+" results are generated every second - one from each parallel channel."
+            + "\nOnly one result would be generated each second if performed serially.");
+        dp.submit(top);
+    }
+    
+    /** Function to create analytic pipeline and add it to a stream */
+    private static BiFunction<TStream<Double>,Integer,TStream<String>> pipeline() {
+        // a simple 3 stage pipeline simulating some amount of work by sleeping
+        return (stream, channel) -> 
+          { 
+            String tagPrefix = "pipeline-ch"+channel;
+            return stream.map(tuple -> {
+                sleep(1000, TimeUnit.MILLISECONDS);
+                return "This is the "+tagPrefix+" result for tuple "+tuple;
+              }).tag(tagPrefix+".stage1")
+              .map(Functions.identity()).tag(tagPrefix+".stage2")
+              .map(Functions.identity()).tag(tagPrefix+".stage3");
+          };
+    }
+
+    private static void sleep(long period, TimeUnit unit) throws RuntimeException {
+        try {
+            Thread.sleep(unit.toMillis(period));
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted", e);
+        }
+    }
+
+}
+```