METRON-1657 Parser aggregation in storm (justinleet) closes apache/metron#1099
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
index 40b01f1..4569a23 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormAdminServiceImpl.java
@@ -50,17 +50,28 @@
         TopologyResponse topologyResponse = new TopologyResponse();
         if (globalConfigService.get() == null) {
             topologyResponse.setErrorMessage(TopologyStatusCode.GLOBAL_CONFIG_MISSING.toString());
-        } else if (sensorParserConfigService.findOne(name) == null) {
-            topologyResponse.setErrorMessage(TopologyStatusCode.SENSOR_PARSER_CONFIG_MISSING.toString());
-        } else {
-            topologyResponse = createResponse(stormCLIClientWrapper.startParserTopology(name), TopologyStatusCode.STARTED, TopologyStatusCode.START_ERROR);
+            return topologyResponse;
         }
-        return topologyResponse;
+
+        String[] sensorTypes = name.split(",");
+        for (String sensorType : sensorTypes) {
+            if (sensorParserConfigService.findOne(sensorType.trim()) == null) {
+                topologyResponse
+                    .setErrorMessage(TopologyStatusCode.SENSOR_PARSER_CONFIG_MISSING.toString());
+                return topologyResponse;
+            }
+        }
+
+        return createResponse(
+            stormCLIClientWrapper.startParserTopology(name),
+                TopologyStatusCode.STARTED,
+                TopologyStatusCode.START_ERROR
+        );
     }
 
     @Override
     public TopologyResponse stopParserTopology(String name, boolean stopNow) throws RestException {
-        return createResponse(stormCLIClientWrapper.stopParserTopology(name, stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
+        return createResponse(stormCLIClientWrapper.stopParserTopology(name.replaceAll(",", "__"), stopNow), TopologyStatusCode.STOPPED, TopologyStatusCode.STOP_ERROR);
     }
 
     @Override
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
index 26049dd..86e3512 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
@@ -17,14 +17,8 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.utils.KafkaUtils;
-import org.apache.metron.rest.MetronRestConstants;
-import org.apache.metron.rest.RestException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.core.env.Environment;
+import static java.util.stream.Collectors.toList;
+import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -34,9 +28,14 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import static java.util.stream.Collectors.toList;
-import static org.apache.metron.rest.MetronRestConstants.ENRICHMENT_TOPOLOGY_NAME;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
 
 public class StormCLIWrapper {
 
@@ -99,13 +98,13 @@
     return exitValue;
   }
 
-  protected String[] getParserStartCommand(String name) {
+  protected String[] getParserStartCommand(String names) {
     List<String> command = new ArrayList<>();
     command.add( environment.getProperty(MetronRestConstants.PARSER_SCRIPT_PATH_SPRING_PROPERTY));
 
     // sensor type
     command.add( "-s");
-    command.add( name);
+    command.add( names);
 
     // zookeeper
     command.add( "-z");
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 1cb4e2e..14ce50b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -28,18 +28,12 @@
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   protected final ParserConfigurations configurations = new ParserConfigurations();
-  private String sensorType;
-  public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
+  public ConfiguredParserBolt(String zookeeperUrl) {
     super(zookeeperUrl, "PARSERS");
-    this.sensorType = sensorType;
   }
 
-  protected SensorParserConfig getSensorParserConfig() {
+  protected SensorParserConfig getSensorParserConfig(String sensorType) {
     return getConfigurations().getSensorParserConfig(sensorType);
   }
 
-  public String getSensorType() {
-    return sensorType;
-  }
-
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 4cc4e61..d3eca42 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -125,6 +125,7 @@
 
   /**
    * The parallelism of the Kafka spout.
+   * If multiple sensors are specified, each sensor will use it's own configured value.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -132,6 +133,7 @@
 
   /**
    * The number of tasks for the Kafka spout.
+   * If multiple sensors are specified, each sensor will use it's own configured value.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -139,6 +141,7 @@
 
   /**
    * The parallelism of the parser bolt.
+   * If multiple sensors are defined, the last one's config will win.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -146,6 +149,7 @@
 
   /**
    * The number of tasks for the parser bolt.
+   * If multiple sensors are defined, the last one's config will win.
    *
    * <p>This property can be overridden on the CLI.
    */
@@ -174,6 +178,7 @@
 
   /**
    * The Kafka security protocol.
+   * If multiple sensors are defined, any non PLAINTEXT configuration will be used.
    *
    * <p>This property can be overridden on the CLI.  This property can also be overridden by the spout config.
    */
@@ -199,6 +204,7 @@
 
   /**
    * Configures the cache that backs stellar field transformations.
+   * If there are multiple sensors, the configs are merged, and the last non-empty config wins.
    *
    * <ul>
    *   <li>stellar.cache.maxSize - The maximum number of elements in the cache.
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
index bc02c5c..0493be6 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/error/MetronError.java
@@ -17,26 +17,31 @@
  */
 package org.apache.metron.common.error;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.metron.common.Constants.ERROR_TYPE;
+import static org.apache.metron.common.Constants.ErrorFields;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.Constants.ErrorType;
 import org.apache.metron.common.utils.HashUtils;
+import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.metron.common.Constants.ERROR_TYPE;
-import static org.apache.metron.common.Constants.ErrorFields;
-
 public class MetronError {
 
   private String message;
   private Throwable throwable;
-  private String sensorType = ERROR_TYPE;
+  private Set<String> sensorTypes = Collections.singleton(ERROR_TYPE);
   private ErrorType errorType = ErrorType.DEFAULT_ERROR;
   private Set<String> errorFields;
   private List<Object> rawMessages;
@@ -51,8 +56,8 @@
     return this;
   }
 
-  public MetronError withSensorType(String sensorType) {
-    this.sensorType = sensorType;
+  public MetronError withSensorType(Set<String> sensorTypes) {
+    this.sensorTypes = sensorTypes;
     return this;
   }
 
@@ -91,7 +96,12 @@
     JSONObject errorMessage = new JSONObject();
     errorMessage.put(Constants.GUID, UUID.randomUUID().toString());
     errorMessage.put(Constants.SENSOR_TYPE, "error");
-    errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorType);
+    if (sensorTypes.size() == 1) {
+      errorMessage.put(ErrorFields.FAILED_SENSOR_TYPE.getName(), sensorTypes.iterator().next());
+    } else {
+      errorMessage
+          .put(ErrorFields.FAILED_SENSOR_TYPE.getName(), new JSONArray().addAll(sensorTypes));
+    }
     errorMessage.put(ErrorFields.ERROR_TYPE.getName(), errorType.getType());
 
     addMessageString(errorMessage);
@@ -184,34 +194,42 @@
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
     MetronError that = (MetronError) o;
 
-    if (message != null ? !message.equals(that.message) : that.message != null)
+    if (message != null ? !message.equals(that.message) : that.message != null) {
       return false;
-    if (throwable != null ? !throwable.equals(that.throwable) : that.throwable != null)
+    }
+    if (getThrowable() != null ? !getThrowable().equals(that.getThrowable())
+        : that.getThrowable() != null) {
       return false;
-    if (sensorType != null ? !sensorType.equals(that.sensorType) : that.sensorType != null)
+    }
+    if (sensorTypes != null ? !sensorTypes.equals(that.sensorTypes) : that.sensorTypes != null) {
       return false;
-    if (errorType != null ? !errorType.equals(that.errorType) : that.errorType != null)
+    }
+    if (errorType != that.errorType) {
       return false;
-    if (errorFields != null ? !errorFields.equals(that.errorFields) : that.errorFields != null)
+    }
+    if (errorFields != null ? !errorFields.equals(that.errorFields) : that.errorFields != null) {
       return false;
+    }
     return rawMessages != null ? rawMessages.equals(that.rawMessages) : that.rawMessages == null;
-
   }
 
   @Override
   public int hashCode() {
     int result = message != null ? message.hashCode() : 0;
-    result = 31 * result + (throwable != null ? throwable.hashCode() : 0);
-    result = 31 * result + (sensorType != null ? sensorType.hashCode() : 0);
+    result = 31 * result + (getThrowable() != null ? getThrowable().hashCode() : 0);
+    result = 31 * result + (sensorTypes != null ? sensorTypes.hashCode() : 0);
     result = 31 * result + (errorType != null ? errorType.hashCode() : 0);
     result = 31 * result + (errorFields != null ? errorFields.hashCode() : 0);
     result = 31 * result + (rawMessages != null ? rawMessages.hashCode() : 0);
     return result;
   }
-
 }
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
index 27b0469..3deba78 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
@@ -46,7 +46,7 @@
   public static class StandAloneConfiguredParserBolt extends ConfiguredParserBolt {
 
     public StandAloneConfiguredParserBolt(String zookeeperUrl) {
-      super(zookeeperUrl, null);
+      super(zookeeperUrl);
     }
 
     @Override
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
index e7390de..177a232 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/error/MetronErrorTest.java
@@ -17,20 +17,20 @@
  */
 package org.apache.metron.common.error;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.collect.Sets;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public class MetronErrorTest {
 
   private JSONObject message1 = new JSONObject();
@@ -47,7 +47,7 @@
     MetronError error = new MetronError()
             .withMessage("test message")
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
-            .withSensorType("sensorType");
+            .withSensorType(Collections.singleton("sensorType"));
 
     JSONObject errorJSON = error.getJSONObject();
     assertEquals("test message", errorJSON.get(Constants.ErrorFields.MESSAGE.getName()));
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 43bcc4a..0e428e3 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -82,6 +82,13 @@
 (e.g. stacktrace) and original message causing the error and sent to an
 `error` queue.  Invalid messages as determined by global validation
 functions are also treated as errors and sent to an `error` queue. 
+
+Multiple sensors can be aggregated into a single Storm topology. When this is done, there will be
+multiple Kafka spouts, but only a single parser bolt which will handle delegating to the correct 
+parser as needed. There are some constraints around this, in particular regarding some configuration.
+Additionally, all sensors must flow to the same error topic. The Kafka topic is retrieved from the input Tuple itself.
+
+A worked example of this can be found in the [Parser Chaining use case](../../use-cases/parser_chaining/README.md#aggregated-parsers-with-parser-chaining).
  
 ## Message Format
 
@@ -101,7 +108,7 @@
 * timestamp (epoch)
 * original_string: A human friendly string representation of the message
 
-The timestamp and original_string fields are madatory. The remaining standard fields are optional.  If any of the optional fields are not applicable then the field should be left out of the JSON.
+The timestamp and original_string fields are mandatory. The remaining standard fields are optional.  If any of the optional fields are not applicable then the field should be left out of the JSON.
 
 So putting it all together a typical Metron message with all 5-tuple fields present would look like the following:
 
@@ -138,6 +145,8 @@
 * `raw_message_bytes` : The raw message bytes
 * `error_hash` : A hash of the error message
 
+When aggregating multiple sensors, all sensors must be using the same error topic.
+
 ## Parser Configuration
 
 The configuration for the various parser topologies is defined by JSON
@@ -170,16 +179,16 @@
     parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables batching.
   * The kafka writer can be configured within the parser config as well.  (This is all configured a priori, but this is convenient for overriding the settings).  See [here](../metron-writer/README.md#kafka-writer)
 * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic.
-* `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can be overridden on the command line.
-* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line.
-* `parserParallelism` : The parser bolt parallelism (default to `1`). This can be overridden on the command line.
-* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). This can be overridden on the command line.
+* `spoutParallelism` : The kafka spout parallelism (default to `1`).  This can be overridden on the command line, and if there are multiple sensors should be in a comma separated list in the same order as the sensors.
+* `spoutNumTasks` : The number of tasks for the spout (default to `1`). This can be overridden on the command line, and if there are multiple sensors should be in a comma separated list in the same order as the sensors.
+* `parserParallelism` : The parser bolt parallelism (default to `1`). If there are multiple sensors, the last one's configuration will be used. This can be overridden on the command line.
+* `parserNumTasks` : The number of tasks for the parser bolt (default to `1`). If there are multiple sensors, the last one's configuration will be used. This can be overridden on the command line.
 * `errorWriterParallelism` : The error writer bolt parallelism (default to `1`). This can be overridden on the command line.
 * `errorWriterNumTasks` : The number of tasks for the error writer bolt (default to `1`). This can be overridden on the command line.
 * `numWorkers` : The number of workers to use in the topology (default is the storm default of `1`).
 * `numAckers` : The number of acker executors to use in the topology (default is the storm default of `1`).
-* `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line.
-* `securityProtocol` : The security protocol to use for reading from kafka (this is a string).  This can be overridden on the command line and also specified in the spout config via the `security.protocol` key.  If both are specified, then they are merged and the CLI will take precedence.
+* `spoutConfig` : A map representing a custom spout config (this is a map). If there are multiple sensors, the configs will be merged with the last specified taking precedence. This can be overridden on the command line.
+* `securityProtocol` : The security protocol to use for reading from kafka (this is a string).  This can be overridden on the command line and also specified in the spout config via the `security.protocol` key.  If both are specified, then they are merged and the CLI will take precedence. If multiple sensors are used, any non "PLAINTEXT" value will be used.
 * `stormConfig` : The storm config to use (this is a map).  This can be overridden on the command line.  If both are specified, they are merged with CLI properties taking precedence.
 * `cacheConfig` : Cache config for stellar field transformations.   This configures a least frequently used cache.  This is a map with the following keys.  If not explicitly configured (the default), then no cache will be used.
   * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default is to not use a cache.
@@ -598,6 +607,8 @@
 Default installed Metron is untuned for production deployment.  There
 are a few knobs to tune to get the most out of your system.
 
+When using aggregated parsers, it's highly recommended to aggregate parsers with similar velocity and parser complexity together.
+
 # Notes on Adding a New Sensor
 In order to allow for meta alerts to be queries alongside regular alerts in Elasticsearch 2.x,
 it is necessary to add an additional field to the templates and mapping for existing sensors.
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index f68c670..213d02c 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -19,7 +19,6 @@
 
 
 import com.github.benmanes.caffeine.cache.Cache;
-import java.io.IOException;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -27,6 +26,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -42,15 +42,17 @@
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration;
 import org.apache.metron.writer.WriterToBulkWriter;
 import org.apache.metron.writer.bolt.BatchTimeoutHelper;
 import org.apache.storm.Config;
@@ -69,10 +71,9 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private OutputCollector collector;
-  private MessageParser<JSONObject> parser;
-  //default filter is noop, so pass everything through.
-  private MessageFilter<JSONObject> filter;
-  private WriterHandler writer;
+  private Map<String, ParserComponents> sensorToComponentMap;
+  private Map<String, String> topicToSensorMap = new HashMap<>();
+
   private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
   private transient Cache<CachingStellarProcessor.Key, Object> cache;
@@ -81,20 +82,21 @@
   private int batchTimeoutDivisor = 1;
 
   public ParserBolt( String zookeeperUrl
-                   , String sensorType
-                   , MessageParser<JSONObject> parser
-                   , WriterHandler writer
-  )
-  {
-    super(zookeeperUrl, sensorType);
-    this.writer = writer;
-    this.parser = parser;
-  }
+                   , Map<String, ParserComponents> sensorToComponentMap
+  ) {
+    super(zookeeperUrl);
+    this.sensorToComponentMap = sensorToComponentMap;
 
-
-  public ParserBolt withMessageFilter(MessageFilter<JSONObject> filter) {
-    this.filter = filter;
-    return this;
+    // Ensure that all sensors are either bulk sensors or not bulk sensors.  Can't mix and match.
+    Boolean handleAcks = null;
+    for (Map.Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
+      boolean writerHandleAck = entry.getValue().getWriter().handleAck();
+      if (handleAcks == null) {
+        handleAcks = writerHandleAck;
+      } else if (!handleAcks.equals(writerHandleAck)) {
+        throw new IllegalArgumentException("All writers must match when calling handleAck()");
+      }
+    }
   }
 
   /**
@@ -137,8 +139,8 @@
     return defaultBatchTimeout;
   }
 
-  public MessageParser<JSONObject> getParser() {
-    return parser;
+  public Map<String, ParserComponents> getSensorToComponentMap() {
+    return sensorToComponentMap;
   }
 
   /**
@@ -153,14 +155,15 @@
     // to get the valid WriterConfiguration.  But don't store any non-serializable objects,
     // else Storm will throw a runtime error.
     Function<WriterConfiguration, WriterConfiguration> configurationXform;
-    if(writer.isWriterToBulkWriter()) {
+    WriterHandler writer = sensorToComponentMap.entrySet().iterator().next().getValue().getWriter();
+    if (writer.isWriterToBulkWriter()) {
       configurationXform = WriterToBulkWriter.TRANSFORMATION;
-    }
-    else {
+    } else {
       configurationXform = x -> x;
     }
     WriterConfiguration writerconf = configurationXform
-        .apply(getConfigurationStrategy().createWriterConfig(writer.getBulkMessageWriter(), getConfigurations()));
+        .apply(getConfigurationStrategy()
+            .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations()));
 
     BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
     this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval();
@@ -182,40 +185,61 @@
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
-    if(getSensorParserConfig() != null) {
-      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
-    }
-    initializeStellar();
-    if(getSensorParserConfig() != null && filter == null) {
-      getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
-      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
-        filter = Filters.get(getSensorParserConfig().getFilterClassName()
-                , getSensorParserConfig().getParserConfig()
-        );
+
+    // Build the Stellar cache
+    Map<String, Object> cacheConfig = new HashMap<>();
+    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+      String sensor = entry.getKey();
+      SensorParserConfig config = getSensorParserConfig(sensor);
+
+      if (config != null) {
+        cacheConfig.putAll(config.getCacheConfig());
       }
     }
+    cache = CachingStellarProcessor.createCache(cacheConfig);
 
-    parser.init();
+    // Need to prep all sensors
+    for (Map.Entry<String, ParserComponents> entry: sensorToComponentMap.entrySet()) {
+      String sensor = entry.getKey();
+      MessageParser<JSONObject> parser = entry.getValue().getMessageParser();
 
-    writer.init(stormConf, context, collector, getConfigurations());
-    if (defaultBatchTimeout == 0) {
-      //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
-      //probably because we are in a unit test scenario.  So calculate it here.
-      WriterConfiguration writerConfig = getConfigurationStrategy()
-          .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations());
-      BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerConfig::getAllConfiguredTimeouts, batchTimeoutDivisor);
-      defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
-    }
-    writer.setDefaultBatchTimeout(defaultBatchTimeout);
+      initializeStellar();
+      if (getSensorParserConfig(sensor) != null && sensorToComponentMap.get(sensor).getFilter() == null) {
+        getSensorParserConfig(sensor).getParserConfig().putIfAbsent("stellarContext", stellarContext);
+        if (!StringUtils.isEmpty(getSensorParserConfig(sensor).getFilterClassName())) {
+          MessageFilter<JSONObject> filter = Filters.get(
+              getSensorParserConfig(sensor).getFilterClassName(),
+              getSensorParserConfig(sensor).getParserConfig()
+          );
+          getSensorToComponentMap().get(sensor).setFilter(filter);
+        }
+      }
 
-    SensorParserConfig config = getSensorParserConfig();
-    if(config != null) {
-      config.init();
+      parser.init();
+
+      SensorParserConfig config = getSensorParserConfig(sensor);
+      if (config != null) {
+        config.init();
+        topicToSensorMap.put(config.getSensorTopic(), sensor);
+      } else {
+        throw new IllegalStateException(
+            "Unable to retrieve a parser config for " + sensor);
+      }
+      parser.configure(config.getParserConfig());
+
+      WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+      writer.init(stormConf, context, collector, getConfigurations());
+      if (defaultBatchTimeout == 0) {
+        //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
+        //probably because we are in a unit test scenario.  So calculate it here.
+        WriterConfiguration writerConfig = getConfigurationStrategy()
+            .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations());
+        BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(
+            writerConfig::getAllConfiguredTimeouts, batchTimeoutDivisor);
+        defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+      }
+      writer.setDefaultBatchTimeout(defaultBatchTimeout);
     }
-    else {
-      throw new IllegalStateException("Unable to retrieve a parser config for " + getSensorType());
-    }
-    parser.configure(config.getParserConfig());
   }
 
   protected void initializeStellar() {
@@ -237,7 +261,9 @@
   public void execute(Tuple tuple) {
     if (TupleUtils.isTick(tuple)) {
       try {
-        writer.flush(getConfigurations(), messageGetStrategy);
+        for (Entry<String, ParserComponents> entry : sensorToComponentMap.entrySet()) {
+          entry.getValue().getWriter().flush(getConfigurations(), messageGetStrategy);
+        }
       } catch (Exception e) {
         throw new RuntimeException(
             "This should have been caught in the writerHandler.  If you see this, file a JIRA", e);
@@ -246,69 +272,101 @@
       }
       return;
     }
-    SensorParserConfig sensorParserConfig = getSensorParserConfig();
+
     byte[] originalMessage = (byte[]) messageGetStrategy.get(tuple);
     try {
-      //we want to ack the tuple in the situation where we have are not doing a bulk write
-      //otherwise we want to defer to the writerComponent who will ack on bulk commit.
-      boolean ackTuple = !writer.handleAck();
+      SensorParserConfig sensorParserConfig;
+      MessageParser<JSONObject> parser;
+      String sensor;
+      Map<String, Object> metadata;
+      if (sensorToComponentMap.size() == 1) {
+        // There's only one parser, so grab info directly
+        Entry<String, ParserComponents> sensorParser = sensorToComponentMap.entrySet().iterator()
+            .next();
+        sensor = sensorParser.getKey();
+        parser = sensorParser.getValue().getMessageParser();
+        sensorParserConfig = getSensorParserConfig(sensor);
+      } else {
+        // There's multiple parsers, so pull the topic from the Tuple and look up the sensor
+        String topic = tuple.getStringByField(FieldsConfiguration.TOPIC.getFieldName());
+        sensor = topicToSensorMap.get(topic);
+        parser = sensorToComponentMap.get(sensor).getMessageParser();
+        sensorParserConfig = getSensorParserConfig(sensor);
+      }
+
+      List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
+      boolean ackTuple = false;
       int numWritten = 0;
-      if(sensorParserConfig != null) {
+      if (sensorParserConfig != null) {
         RawMessage rawMessage = RawMessageUtil.INSTANCE.getRawMessage( sensorParserConfig.getRawMessageStrategy()
-                                                                   , tuple
-                                                                   , originalMessage
-                                                                   , sensorParserConfig.getReadMetadata()
-                                                                   , sensorParserConfig.getRawMessageStrategyConfig()
-                                                                   );
-        Map<String, Object> metadata = rawMessage.getMetadata();
-        List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
+            , tuple
+            , originalMessage
+            , sensorParserConfig.getReadMetadata()
+            , sensorParserConfig.getRawMessageStrategyConfig()
+        );
+        metadata = rawMessage.getMetadata();
 
         Optional<List<JSONObject>> messages = parser.parseOptional(rawMessage.getMessage());
         for (JSONObject message : messages.orElse(Collections.emptyList())) {
-          sensorParserConfig.getRawMessageStrategy().mergeMetadata( message
-                                                                  , metadata
-                                                                  , sensorParserConfig.getMergeMetadata()
-                                                                  , sensorParserConfig.getRawMessageStrategyConfig()
-                                                                  );
-          message.put(Constants.SENSOR_TYPE, getSensorType());
+          //we want to ack the tuple in the situation where we have are not doing a bulk write
+          //otherwise we want to defer to the writerComponent who will ack on bulk commit.
+          WriterHandler writer = sensorToComponentMap.get(sensor).getWriter();
+          ackTuple = !writer.handleAck();
+
+          sensorParserConfig.getRawMessageStrategy().mergeMetadata(
+              message,
+              metadata,
+              sensorParserConfig.getMergeMetadata(),
+              sensorParserConfig.getRawMessageStrategyConfig()
+          );
+          message.put(Constants.SENSOR_TYPE, sensor);
+
           for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
             if (handler != null) {
-              if(!sensorParserConfig.getMergeMetadata()) {
+              if (!sensorParserConfig.getMergeMetadata()) {
                 //if we haven't merged metadata, then we need to pass them along as configuration params.
-                handler.transformAndUpdate(message, stellarContext, sensorParserConfig.getParserConfig(), metadata);
-              }
-              else {
-                handler.transformAndUpdate(message, stellarContext, sensorParserConfig.getParserConfig());
+                handler.transformAndUpdate(
+                    message,
+                    stellarContext,
+                    sensorParserConfig.getParserConfig(),
+                    metadata
+                );
+              } else {
+                handler.transformAndUpdate(
+                    message,
+                    stellarContext,
+                    sensorParserConfig.getParserConfig()
+                );
               }
             }
           }
-          if(!message.containsKey(Constants.GUID)) {
+          if (!message.containsKey(Constants.GUID)) {
             message.put(Constants.GUID, UUID.randomUUID().toString());
           }
 
+          MessageFilter<JSONObject> filter = sensorToComponentMap.get(sensor).getFilter();
           if (filter == null || filter.emitTuple(message, stellarContext)) {
             boolean isInvalid = !parser.validate(message);
             List<FieldValidator> failedValidators = null;
-            if(!isInvalid) {
+            if (!isInvalid) {
               failedValidators = getFailedValidators(message, fieldValidations);
               isInvalid = !failedValidators.isEmpty();
             }
-            if( isInvalid) {
+            if (isInvalid) {
               MetronError error = new MetronError()
-                      .withErrorType(Constants.ErrorType.PARSER_INVALID)
-                      .withSensorType(getSensorType())
-                      .addRawMessage(message);
-              Set<String> errorFields = failedValidators == null?null:failedValidators.stream()
-                      .flatMap(fieldValidator -> fieldValidator.getInput().stream())
-                      .collect(Collectors.toSet());
+                  .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                  .withSensorType(Collections.singleton(sensor))
+                  .addRawMessage(message);
+              Set<String> errorFields = failedValidators == null ? null : failedValidators.stream()
+                  .flatMap(fieldValidator -> fieldValidator.getInput().stream())
+                  .collect(Collectors.toSet());
               if (errorFields != null && !errorFields.isEmpty()) {
                 error.withErrorFields(errorFields);
               }
               ErrorUtils.handleError(collector, error);
-            }
-            else {
+            } else {
               numWritten++;
-              writer.write(getSensorType(), tuple, message, getConfigurations(), messageGetStrategy);
+              writer.write(sensor, tuple, message, getConfigurations(), messageGetStrategy);
             }
           }
         }
@@ -316,9 +374,10 @@
       //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
       //(meaning that none of the messages are valid either globally or locally)
       //then we want to handle the ack ourselves.
-      if(ackTuple || numWritten == 0) {
+      if (ackTuple || numWritten == 0) {
         collector.ack(tuple);
       }
+
     } catch (Throwable ex) {
       handleError(originalMessage, tuple, ex, collector);
     }
@@ -328,7 +387,7 @@
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
             .withThrowable(ex)
-            .withSensorType(getSensorType())
+            .withSensorType(sensorToComponentMap.keySet())
             .addRawMessage(originalMessage);
     ErrorUtils.handleError(collector, error);
     collector.ack(tuple);
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
index ef93ba2..fdfceda 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.parsers.bolt;
 
+import java.util.Collections;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.error.MetronError;
@@ -84,7 +85,7 @@
       MetronError error = new MetronError()
               .withErrorType(errorType)
               .withThrowable(e)
-              .withSensorType(sensorType)
+              .withSensorType(Collections.singleton(sensorType))
               .addRawMessage(message);
       ErrorUtils.handleError(collector, error);
       collector.ack(tuple);
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
new file mode 100644
index 0000000..32d56b9
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserComponents.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.parsers.topology;
+
+import java.io.Serializable;
+import org.apache.metron.parsers.bolt.WriterHandler;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.json.simple.JSONObject;
+
+public class ParserComponents implements Serializable {
+  private static final long serialVersionUID = 7880346740026374665L;
+
+  private MessageParser<JSONObject> messageParser;
+  private MessageFilter<JSONObject> filter;
+  private WriterHandler writer;
+
+  public ParserComponents(
+      MessageParser<JSONObject> messageParser,
+      MessageFilter<JSONObject> filter,
+      WriterHandler writer) {
+    this.messageParser = messageParser;
+    this.filter = filter;
+    this.writer = writer;
+  }
+
+  public MessageParser<JSONObject> getMessageParser() {
+    return messageParser;
+  }
+
+  public MessageFilter<JSONObject> getFilter() {
+    return filter;
+  }
+
+  public WriterHandler getWriter() {
+    return writer;
+  }
+
+  public void setMessageParser(
+      MessageParser<JSONObject> messageParser) {
+    this.messageParser = messageParser;
+  }
+
+  public void setFilter(
+      MessageFilter<JSONObject> filter) {
+    this.filter = filter;
+  }
+
+  public void setWriter(WriterHandler writer) {
+    this.writer = writer;
+  }
+}
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 5b3e0d5..d20e1a5 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -17,35 +17,45 @@
  */
 package org.apache.metron.parsers.topology;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.metron.common.utils.KafkaUtils;
-import org.apache.metron.parsers.topology.config.ValueSupplier;
-import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
-import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
-import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.topology.TopologyBuilder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
-import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
+import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
+import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
 import org.apache.metron.writer.AbstractWriter;
 import org.apache.metron.writer.kafka.KafkaWriter;
+import org.apache.storm.Config;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
 import org.json.simple.JSONObject;
 
-import java.util.*;
-
 /**
  * Builds a Storm topology that parses telemetry data received from a sensor.
  */
@@ -75,7 +85,7 @@
    *
    * @param zookeeperUrl             Zookeeper URL
    * @param brokerUrl                Kafka Broker URL
-   * @param sensorType               Type of sensor
+   * @param sensorTypes               Type of sensor
    * @param spoutParallelismSupplier         Supplier for the parallelism hint for the spout
    * @param spoutNumTasksSupplier            Supplier for the number of tasks for the spout
    * @param parserParallelismSupplier        Supplier for the parallelism hint for the parser bolt
@@ -91,14 +101,14 @@
    */
   public static ParserTopology build(String zookeeperUrl,
                                       Optional<String> brokerUrl,
-                                      String sensorType,
-                                      ValueSupplier<Integer> spoutParallelismSupplier,
-                                      ValueSupplier<Integer> spoutNumTasksSupplier,
+                                      List<String> sensorTypes,
+                                      ValueSupplier<List> spoutParallelismSupplier,
+                                      ValueSupplier<List> spoutNumTasksSupplier,
                                       ValueSupplier<Integer> parserParallelismSupplier,
                                       ValueSupplier<Integer> parserNumTasksSupplier,
                                       ValueSupplier<Integer> errorWriterParallelismSupplier,
                                       ValueSupplier<Integer> errorWriterNumTasksSupplier,
-                                      ValueSupplier<Map> kafkaSpoutConfigSupplier,
+                                      ValueSupplier<List> kafkaSpoutConfigSupplier,
                                       ValueSupplier<String> securityProtocolSupplier,
                                       ValueSupplier<String> outputTopicSupplier,
                                       ValueSupplier<String> errorTopicSupplier,
@@ -107,40 +117,72 @@
 
     // fetch configuration from zookeeper
     ParserConfigurations configs = new ParserConfigurations();
-    SensorParserConfig parserConfig = getSensorParserConfig(zookeeperUrl, sensorType, configs);
-    int spoutParallelism = spoutParallelismSupplier.get(parserConfig, Integer.class);
-    int spoutNumTasks = spoutNumTasksSupplier.get(parserConfig, Integer.class);
-    int parserParallelism = parserParallelismSupplier.get(parserConfig, Integer.class);
-    int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class);
-    int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class);
-    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class);
-    String outputTopic = outputTopicSupplier.get(parserConfig, String.class);
+    Map<String, SensorParserConfig> sensorToParserConfigs = getSensorParserConfig(zookeeperUrl, sensorTypes, configs);
+    Collection<SensorParserConfig> parserConfigs = sensorToParserConfigs.values();
 
-    Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
-    Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
+    @SuppressWarnings("unchecked")
+    List<Integer> spoutParallelism = (List<Integer>) spoutParallelismSupplier.get(parserConfigs, List.class);
+    @SuppressWarnings("unchecked")
+    List<Integer> spoutNumTasks = (List<Integer>) spoutNumTasksSupplier.get(parserConfigs, List.class);
+    int parserParallelism = parserParallelismSupplier.get(parserConfigs, Integer.class);
+    int parserNumTasks = parserNumTasksSupplier.get(parserConfigs, Integer.class);
+    int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfigs, Integer.class);
+    int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfigs, Integer.class);
+    String outputTopic = outputTopicSupplier.get(parserConfigs, String.class);
+
+    List<Map<String, Object>> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfigs, List.class);
+    Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfigs, String.class));
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
-    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig), parserConfig);
-    builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
-            .setNumTasks(spoutNumTasks);
+    int i = 0;
+    List<String> spoutIds = new ArrayList<>();
+    for (Entry<String, SensorParserConfig> entry: sensorToParserConfigs.entrySet()) {
+      KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, entry.getKey(), securityProtocol,
+          Optional.ofNullable(kafkaSpoutConfig.get(i)), entry.getValue());
+      String spoutId = sensorToParserConfigs.size() > 1 ? "kafkaSpout-" + entry.getKey() : "kafkaSpout";
+      builder.setSpout(spoutId, kafkaSpout, spoutParallelism.get(i))
+          .setNumTasks(spoutNumTasks.get(i));
+      spoutIds.add(spoutId);
+      ++i;
+    }
 
     // create the parser bolt
-    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, Optional.of(outputTopic));
-    builder.setBolt("parserBolt", parserBolt, parserParallelism)
-            .setNumTasks(parserNumTasks)
-            .localOrShuffleGrouping("kafkaSpout");
+    ParserBolt parserBolt = createParserBolt(
+        zookeeperUrl,
+        brokerUrl,
+        sensorToParserConfigs,
+        securityProtocol,
+        configs,
+        Optional.ofNullable(outputTopic)
+    );
+
+    BoltDeclarer boltDeclarer = builder
+        .setBolt("parserBolt", parserBolt, parserParallelism)
+        .setNumTasks(parserNumTasks);
+
+    for (String spoutId : spoutIds) {
+      boltDeclarer.localOrShuffleGrouping(spoutId);
+    }
 
     // create the error bolt, if needed
     if (errorWriterNumTasks > 0) {
-      String errorTopic = errorTopicSupplier.get(parserConfig, String.class);
-      WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, errorTopic);
+      String errorTopic = errorTopicSupplier.get(parserConfigs, String.class);
+      WriterBolt errorBolt = createErrorBolt(
+          zookeeperUrl,
+          brokerUrl,
+          sensorTypes.get(0),
+          securityProtocol,
+          configs,
+          parserConfigs.iterator().next(),
+          errorTopic
+      );
       builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
               .setNumTasks(errorWriterNumTasks)
               .localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
     }
 
-    return new ParserTopology(builder, stormConfigSupplier.get(parserConfig, Config.class));
+    return new ParserTopology(builder, stormConfigSupplier.get(parserConfigs, Config.class));
   }
 
   /**
@@ -216,42 +258,62 @@
    *
    * @param zookeeperUrl Zookeeper URL
    * @param brokerUrl    Kafka Broker URL
-   * @param sensorType   Type of sensor that is being consumed.
+   * @param sensorTypeToParserConfig
    * @param configs
-   * @param parserConfig
    * @return A Storm bolt that parses input from a sensor
    */
   private static ParserBolt createParserBolt( String zookeeperUrl,
                                               Optional<String> brokerUrl,
-                                              String sensorType,
+                                              Map<String, SensorParserConfig> sensorTypeToParserConfig,
                                               Optional<String> securityProtocol,
                                               ParserConfigurations configs,
-                                              SensorParserConfig parserConfig,
                                               Optional<String> outputTopic) {
 
-    // create message parser
-    MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
-    parser.configure(parserConfig.getParserConfig());
+    Map<String, ParserComponents> parserBoltConfigs = new HashMap<>();
+    for( Entry<String, SensorParserConfig> entry : sensorTypeToParserConfig.entrySet()) {
+      String sensorType = entry.getKey();
+      SensorParserConfig parserConfig = entry.getValue();
+      // create message parser
+      MessageParser<JSONObject> parser = ReflectionUtils
+          .createInstance(parserConfig.getParserClassName());
+      parser.configure(parserConfig.getParserConfig());
 
-    // create a writer
-    AbstractWriter writer;
-    if(parserConfig.getWriterClassName() == null) {
+      // create message filter
+      MessageFilter<JSONObject> filter = null;
+      if (!StringUtils.isEmpty(parserConfig.getFilterClassName())) {
+        filter = Filters.get(
+            parserConfig.getFilterClassName(),
+            parserConfig.getParserConfig()
+        );
+      }
 
-      // if not configured, use a sensible default
-      writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
-              .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
+      // create a writer
+      AbstractWriter writer;
+      if (parserConfig.getWriterClassName() == null) {
 
-    } else {
-      writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+        // if not configured, use a sensible default
+        writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+            .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
+
+      } else {
+        writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+      }
+
+      // configure it
+      writer.configure(sensorType, new ParserWriterConfiguration(configs));
+
+      // create a writer handler
+      WriterHandler writerHandler = createWriterHandler(writer);
+
+      ParserComponents components = new ParserComponents(
+         parser,
+         filter,
+         writerHandler
+      );
+      parserBoltConfigs.put(sensorType, components);
     }
 
-    // configure it
-    writer.configure(sensorType, new ParserWriterConfiguration(configs));
-
-    // create a writer handler
-    WriterHandler writerHandler = createWriterHandler(writer);
-
-    return new ParserBolt(zookeeperUrl, sensorType, parser, writerHandler);
+    return new ParserBolt(zookeeperUrl, parserBoltConfigs);
   }
 
   /**
@@ -304,22 +366,26 @@
    * Fetch the parser configuration from Zookeeper.
    *
    * @param zookeeperUrl Zookeeper URL
-   * @param sensorType   Type of sensor
+   * @param sensorTypes Types of sensor
    * @param configs
    * @return
    * @throws Exception
    */
-  private static SensorParserConfig getSensorParserConfig(String zookeeperUrl, String sensorType, ParserConfigurations configs) throws Exception {
+  private static Map<String, SensorParserConfig> getSensorParserConfig(String zookeeperUrl, List<String> sensorTypes, ParserConfigurations configs) throws Exception {
+    Map<String, SensorParserConfig> parserConfigs = new HashMap<>();
     try(CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl)) {
       client.start();
       ConfigurationsUtils.updateParserConfigsFromZookeeper(configs, client);
-      SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
-      if (parserConfig == null) {
-        throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
-                "  Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command.");
+      for (String sensorType : sensorTypes) {
+        SensorParserConfig parserConfig = configs.getSensorParserConfig(sensorType);
+        if (parserConfig == null) {
+          throw new IllegalStateException("Cannot find the parser configuration in zookeeper for " + sensorType + "." +
+                  "  Please check that it exists in zookeeper by using the 'zk_load_configs.sh -m DUMP' command.");
+        }
+        parserConfigs.put(sensorType, parserConfig);
       }
-      return parserConfig;
     }
+    return parserConfigs;
   }
 
   /**
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index f60ff44..eb39f89 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,27 +17,44 @@
  */
 package org.apache.metron.parsers.topology;
 
+import com.google.common.base.Joiner;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.metron.parsers.topology.config.Arg;
+import org.apache.metron.parsers.topology.config.ConfigHandlers;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.utils.Utils;
-import com.google.common.base.Joiner;
-import org.apache.commons.cli.*;
-import org.apache.commons.io.FileUtils;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.parsers.topology.config.Arg;
-import org.apache.metron.parsers.topology.config.ConfigHandlers;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.function.Function;
 
 public class ParserTopologyCLI {
 
+  private static final String STORM_JOB_SEPARATOR = "__";
+
   public enum ParserOptions {
     HELP("h", code -> {
       Option o = new Option(code, "help", false, "This screen");
@@ -45,7 +62,7 @@
       return o;
     }),
     ZK_QUORUM("z", code -> {
-      Option o = new Option(code, "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+      Option o = new Option(code, "zk", true, "Zookeeper Quorum URL (zk1:2181,zk2:2181,...");
       o.setArgName("ZK_QUORUM");
       o.setRequired(true);
       return o;
@@ -56,14 +73,14 @@
       o.setRequired(false);
       return o;
     }),
-    SENSOR_TYPE("s", code -> {
-      Option o = new Option(code, "sensor", true, "Sensor Type");
-      o.setArgName("SENSOR_TYPE");
+    SENSOR_TYPES("s", code -> {
+      Option o = new Option(code, "sensor", true, "Sensor Types as comma-separated list");
+      o.setArgName("SENSOR_TYPES");
       o.setRequired(true);
       return o;
     }),
     SPOUT_PARALLELISM("sp", code -> {
-      Option o = new Option(code, "spout_p", true, "Spout Parallelism Hint");
+      Option o = new Option(code, "spout_p", true, "Spout Parallelism Hint. If multiple sensors are specified, this should be a comma separated list in the same order.");
       o.setArgName("SPOUT_PARALLELISM_HINT");
       o.setRequired(false);
       o.setType(Number.class);
@@ -91,7 +108,7 @@
       return o;
     }),
     SPOUT_NUM_TASKS("snt", code -> {
-      Option o = new Option(code, "spout_num_tasks", true, "Spout Num Tasks");
+      Option o = new Option(code, "spout_num_tasks", true, "Spout Num Tasks. If multiple sensors are specified, this should be a comma separated list in the same order.");
       o.setArgName("NUM_TASKS");
       o.setRequired(false);
       o.setType(Number.class);
@@ -307,7 +324,9 @@
   public ParserTopologyBuilder.ParserTopology createParserTopology(final CommandLine cmd) throws Exception {
     String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);
     Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
-    String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
+    String sensorTypeRaw= ParserOptions.SENSOR_TYPES.get(cmd);
+    List<String> sensorTypes = Arrays.stream(sensorTypeRaw.split(",")).map(String::trim).collect(
+        Collectors.toList());
 
     /*
      * It bears mentioning why we're creating this ValueSupplier indirection here.
@@ -328,116 +347,191 @@
      */
 
     // kafka spout parallelism
-    ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
+    ValueSupplier<List> spoutParallelism = (parserConfigs, clazz) -> {
       if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
-        return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
+        // Handle the case where there's only one and we can default reasonably
+        if( parserConfigs.size() == 1) {
+          return Collections.singletonList(Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1")));
+        }
+
+        // Handle the multiple explicitly passed spout parallelism's case.
+        String parallelismRaw = ParserOptions.SPOUT_PARALLELISM.get(cmd, "1");
+        List<String> parallelisms = Arrays.stream(parallelismRaw.split(",")).map(String::trim).collect(
+            Collectors.toList());
+        if (parallelisms.size() != parserConfigs.size()) {
+          throw new IllegalArgumentException("Spout parallelism should match number of sensors 1:1");
+        }
+        List<Integer> spoutParallelisms = new ArrayList<>();
+        for (String s : parallelisms) {
+          spoutParallelisms.add(Integer.parseInt(s));
+        }
+        return spoutParallelisms;
       }
-      return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
+
+      List<Integer> spoutParallelisms = new ArrayList<>();
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        spoutParallelisms.add(parserConfig.getSpoutParallelism());
+      }
+      return spoutParallelisms;
     };
 
     // kafka spout number of tasks
-    ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
+    ValueSupplier<List> spoutNumTasks = (parserConfigs, clazz) -> {
       if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
-        return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
+        // Handle the case where there's only one and we can default reasonably
+        if( parserConfigs.size() == 1) {
+          return Collections.singletonList(Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1")));
+        }
+
+        // Handle the multiple explicitly passed spout parallelism's case.
+        String numTasksRaw = ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1");
+        List<String> numTasks = Arrays.stream(numTasksRaw.split(",")).map(String::trim).collect(
+            Collectors.toList());
+        if (numTasks.size() != parserConfigs.size()) {
+          throw new IllegalArgumentException("Spout num tasks should match number of sensors 1:1");
+        }
+        List<Integer> spoutTasksList = new ArrayList<>();
+        for (String s : numTasks) {
+          spoutTasksList.add(Integer.parseInt(s));
+        }
+        return spoutTasksList;
       }
-      return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
+
+      List<Integer> numTasks = new ArrayList<>();
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        numTasks.add(parserConfig.getSpoutNumTasks());
+      }
+      return numTasks;
     };
 
     // parser bolt parallelism
-    ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> parserParallelism = (parserConfigs, clazz) -> {
       if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getParserParallelism();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // parser bolt number of tasks
-    ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> parserNumTasks = (parserConfigs, clazz) -> {
       if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getParserNumTasks();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // error bolt parallelism
-    ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> errorParallelism = (parserConfigs, clazz) -> {
       if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getErrorWriterParallelism();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // error bolt number of tasks
-    ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
+    ValueSupplier<Integer> errorNumTasks = (parserConfigs, clazz) -> {
       if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
       }
-      return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
+      int retValue = 1;
+      for (SensorParserConfig config : parserConfigs) {
+        Integer configValue = config.getErrorWriterNumTasks();
+        retValue = configValue == null ? retValue : configValue;
+      }
+      return retValue;
     };
 
     // kafka spout config
-    ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
+    ValueSupplier<List> spoutConfig = (parserConfigs, clazz) -> {
       if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
-        return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
+        return Collections.singletonList(readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd))));
       }
-      return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>());
+      List<Map<String, Object>> retValue = new ArrayList<>();
+      for (SensorParserConfig config : parserConfigs) {
+        retValue.add(config.getSpoutConfig());
+      }
+      return retValue;
     };
 
     // security protocol
-    ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
+    ValueSupplier<String> securityProtocol = (parserConfigs, clazz) -> {
       Optional<String> sp = Optional.empty();
       if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
         sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd));
       }
+      // Need to adjust to handle list of spoutConfigs. Any non-plaintext wins
       if (!sp.isPresent()) {
-        sp = getSecurityProtocol(sp, spoutConfig.get(parserConfig, Map.class));
+        sp = getSecurityProtocol(sp, spoutConfig.get(parserConfigs, List.class));
       }
-      return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
+      // Need to look through parserConfigs for any non-plaintext
+      String parserConfigSp = SecurityProtocol.PLAINTEXT.name;
+      for (SensorParserConfig config : parserConfigs) {
+        String configSp = config.getSecurityProtocol();
+        if (!SecurityProtocol.PLAINTEXT.name.equals(configSp)) {
+          // We have a winner
+          parserConfigSp = configSp;
+        }
+      }
+
+      return sp.orElse(Optional.ofNullable(parserConfigSp).orElse(null));
     };
 
     // storm configuration
-    ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
-      Map<String, Object> c = parserConfig.getStormConfig();
+    ValueSupplier<Config> stormConf = (parserConfigs, clazz) -> {
+      // Last one wins
       Config finalConfig = new Config();
-      if(c != null && !c.isEmpty()) {
-        finalConfig.putAll(c);
-      }
-      if(parserConfig.getNumAckers() != null) {
-        Config.setNumAckers(finalConfig, parserConfig.getNumAckers());
-      }
-      if(parserConfig.getNumWorkers() != null) {
-        Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers());
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        Map<String, Object> c = parserConfig.getStormConfig();
+        if (c != null && !c.isEmpty()) {
+          finalConfig.putAll(c);
+        }
+        if (parserConfig.getNumAckers() != null) {
+          Config.setNumAckers(finalConfig, parserConfig.getNumAckers());
+        }
+        if (parserConfig.getNumWorkers() != null) {
+          Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers());
+        }
       }
       return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
     };
 
     // output topic
-    ValueSupplier<String> outputTopic = (parserConfig, clazz) -> {
-      String topic;
+    ValueSupplier<String> outputTopic = (parserConfigs, clazz) -> {
+      String topic = null;
 
       if(ParserOptions.OUTPUT_TOPIC.has(cmd)) {
         topic = ParserOptions.OUTPUT_TOPIC.get(cmd);
-
-      } else if(parserConfig.getOutputTopic() != null) {
-        topic = parserConfig.getOutputTopic();
-
-      } else {
-        topic = Constants.ENRICHMENT_TOPIC;
       }
 
       return topic;
     };
 
-    // error topic
-    ValueSupplier<String> errorTopic = (parserConfig, clazz) -> {
-      String topic;
-
-      if(parserConfig.getErrorTopic() != null) {
-        topic = parserConfig.getErrorTopic();
-
-      } else {
-        // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created
-        topic = null;
+    // Error topic will throw an exception if the topics aren't all the same.
+    ValueSupplier<String> errorTopic = (parserConfigs, clazz) -> {
+      // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created
+      String topic = null;
+      for (SensorParserConfig parserConfig : parserConfigs) {
+        String currentTopic = parserConfig.getErrorTopic();
+        if(topic != null && !topic.equals(currentTopic)) {
+          throw new IllegalArgumentException(
+              "Parser Aggregation specified with differing error topics");
+        }
+        topic = currentTopic;
       }
 
       return topic;
@@ -446,7 +540,7 @@
     return getParserTopology(
             zookeeperUrl,
             brokerUrl,
-            sensorType,
+            sensorTypes,
             spoutParallelism,
             spoutNumTasks,
             parserParallelism,
@@ -462,14 +556,14 @@
 
   protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl,
                                                                     Optional<String> brokerUrl,
-                                                                    String sensorType,
-                                                                    ValueSupplier<Integer> spoutParallelism,
-                                                                    ValueSupplier<Integer> spoutNumTasks,
+                                                                    List<String> sensorTypes,
+                                                                    ValueSupplier<List> spoutParallelism,
+                                                                    ValueSupplier<List> spoutNumTasks,
                                                                     ValueSupplier<Integer> parserParallelism,
                                                                     ValueSupplier<Integer> parserNumTasks,
                                                                     ValueSupplier<Integer> errorParallelism,
                                                                     ValueSupplier<Integer> errorNumTasks,
-                                                                    ValueSupplier<Map> spoutConfig,
+                                                                    ValueSupplier<List> spoutConfig,
                                                                     ValueSupplier<String> securityProtocol,
                                                                     ValueSupplier<Config> stormConf,
                                                                     ValueSupplier<String> outputTopic,
@@ -477,7 +571,7 @@
     return ParserTopologyBuilder.build(
             zookeeperUrl,
             brokerUrl,
-            sensorType,
+            sensorTypes,
             spoutParallelism,
             spoutNumTasks,
             parserParallelism,
@@ -505,15 +599,15 @@
       }
       ParserTopologyCLI cli = new ParserTopologyCLI();
       ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd);
-      String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
+      String sensorTypes = ParserOptions.SENSOR_TYPES.get(cmd);
       if (ParserOptions.TEST.has(cmd)) {
         topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology());
+        cluster.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
         Utils.sleep(300000);
         cluster.shutdown();
       } else {
-        StormSubmitter.submitTopology(sensorType, topology.getTopologyConfig(), topology.getBuilder().createTopology());
+        StormSubmitter.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology());
       }
     } catch (Exception e) {
       e.printStackTrace();
@@ -521,15 +615,27 @@
     }
   }
 
-  private static Optional<String> getSecurityProtocol(Optional<String> protocol, Map<String, Object> spoutConfig) {
+  private static Optional<String> getSecurityProtocol(Optional<String> protocol, List<Map<String, Object>> spoutConfig) {
     Optional<String> ret = protocol;
-    if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+    if(ret.isPresent() && protocol.get().equalsIgnoreCase(SecurityProtocol.PLAINTEXT.name)) {
       ret = Optional.empty();
     }
     if(!ret.isPresent()) {
-      ret = Optional.ofNullable((String) spoutConfig.get("security.protocol"));
+      // Need to look through spoutConfig for any non-plaintext
+      String spoutConfigSp = null;
+      for (Map<String, Object> config: spoutConfig) {
+        String configSp = (String) config.get(KafkaUtils.SECURITY_PROTOCOL);
+        if (configSp != null && !SecurityProtocol.PLAINTEXT.name.equals(configSp)) {
+          // We have a winner
+          spoutConfigSp = configSp;
+        } else if (configSp != null) {
+          // Use something explicitly defined.
+          spoutConfigSp = configSp;
+        }
+      }
+      ret = Optional.ofNullable(spoutConfigSp);
     }
-    if(ret.isPresent() && ret.get().equalsIgnoreCase("PLAINTEXT")) {
+    if(ret.isPresent() && ret.get().equalsIgnoreCase(SecurityProtocol.PLAINTEXT.name)) {
       ret = Optional.empty();
     }
     return ret;
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
index 0ede0f8..98aca7b 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.parsers.topology.config;
 
+import java.util.Collection;
 import org.apache.metron.common.configuration.SensorParserConfig;
 
 
@@ -26,5 +27,5 @@
  * @param <T>
  */
 public interface ValueSupplier<T> {
-  T get(SensorParserConfig config, Class<T> clazz);
+  T get(Collection<SensorParserConfig> config, Class<T> clazz);
 }
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 15ce735..06f4cec 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -58,6 +58,7 @@
 import org.apache.metron.parsers.BasicParser;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
@@ -185,7 +186,15 @@
   @Test
   public void testEmpty() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(writer)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater();
@@ -209,7 +218,7 @@
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
             .withThrowable(new NullPointerException())
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(sampleBinary);
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
@@ -217,7 +226,15 @@
   @Test
   public void testInvalid() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(writer)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater();
@@ -243,7 +260,7 @@
 
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_INVALID)
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorFields(new HashSet<String>() {{ add("field"); }})
             .addRawMessage(new JSONObject(){{
               put("field", "invalidValue");
@@ -255,14 +272,20 @@
 
   @Test
   public void test() throws Exception {
-
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(writer)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater();
       }
-
     };
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
@@ -290,7 +313,6 @@
     when(parser.validate(eq(messages.get(1)))).thenReturn(true);
     when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false);
     when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true);
-    parserBolt.withMessageFilter(filter);
     parserBolt.execute(tuple);
     verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
     verify(outputCollector, times(2)).ack(tuple);
@@ -317,21 +339,15 @@
   @Test
   public void testFilterSuccess() throws Exception {
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
-      @Override
-      protected SensorParserConfig getSensorParserConfig() {
-        try {
-          return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
-      }
-    };
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig);
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
@@ -358,10 +374,17 @@
   @Test
   public void testFilterFailure() throws Exception {
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
-      protected SensorParserConfig getSensorParserConfig() {
+      protected SensorParserConfig getSensorParserConfig(String sensorType) {
         try {
           return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
         } catch (IOException e) {
@@ -433,21 +456,15 @@
 
       }
     };
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, dummyParser, new WriterHandler(recordingWriter)) {
-      @Override
-      protected SensorParserConfig getSensorParserConfig() {
-        try {
-          return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
-      }
-    };
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            dummyParser,
+            null,
+            new WriterHandler(recordingWriter)
+        )
+    );
+    ParserBolt parserBolt = buildParserBolt(parserMap, csvWithFieldTransformations);
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
@@ -461,10 +478,16 @@
 
   @Test
   public void testDefaultBatchSize() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         // this uses default batch size
@@ -487,7 +510,6 @@
       response.addSuccess(uniqueTuples[i]);
     }
     when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response);
-    parserBolt.withMessageFilter(filter);
     for (Tuple tuple : uniqueTuples) {
       parserBolt.execute(tuple);
     }
@@ -498,10 +520,16 @@
 
   @Test
   public void testLessRecordsThanDefaultBatchSize() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         // this uses default batch size
@@ -524,7 +552,6 @@
       uniqueTuples[i] = mock(Tuple.class);
       response.addSuccess(uniqueTuples[i]);
     }
-    parserBolt.withMessageFilter(filter);
     for (Tuple tuple : uniqueTuples) {
       parserBolt.execute(tuple);
     }
@@ -542,10 +569,16 @@
 
   @Test
   public void testBatchOfOne() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater(Optional.of(1));
@@ -563,17 +596,22 @@
     BulkWriterResponse response = new BulkWriterResponse();
     response.addSuccess(t1);
     when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response);
-    parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
     verify(outputCollector, times(1)).ack(t1);
   }
 
   @Test
   public void testBatchOfFive() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater(Optional.of(5));
@@ -592,7 +630,6 @@
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllSuccesses(tuples);
     when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
-    parserBolt.withMessageFilter(filter);
     writeNonBatch(outputCollector, parserBolt, t1);
     writeNonBatch(outputCollector, parserBolt, t2);
     writeNonBatch(outputCollector, parserBolt, t3);
@@ -610,9 +647,16 @@
 
   @Test
   public void testBatchOfFiveWithError() throws Exception {
-
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater(Optional.of(5));
@@ -629,7 +673,6 @@
     when(parser.validate(any())).thenReturn(true);
     when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
     parserBolt.execute(t2);
     parserBolt.execute(t3);
@@ -654,6 +697,25 @@
     parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
   }
 
+  private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap,
+      String csvWithFieldTransformations) {
+    return new ParserBolt("zookeeperUrl", parserMap) {
+      @Override
+      protected SensorParserConfig getSensorParserConfig(String sensorType) {
+        try {
+          return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater(Optional.of(1));
+      }
+    };
+  }
+
   private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
     bolt.execute(t);
   }
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index a23c368..b04d8f7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -18,6 +18,20 @@
 
 package org.apache.metron.parsers.bolt;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.log4j.Level;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.IndexingConfigurations;
@@ -38,20 +52,6 @@
 import org.junit.Test;
 import org.mockito.Mock;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 public class WriterBoltTest extends BaseBoltTest{
   @Mock
   protected TopologyContext topologyContext;
@@ -164,7 +164,7 @@
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.DEFAULT_ERROR)
             .withThrowable(new IllegalStateException("Unhandled bulk errors in response: {java.lang.Exception: write error=[tuple]}"))
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(new JSONObject());
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index ec7c3ab..2cba40a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -17,9 +17,18 @@
  */
 package org.apache.metron.parsers.integration;
 
-import com.google.common.collect.ImmutableList;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.FieldValidator;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
@@ -30,29 +39,10 @@
 import org.apache.metron.integration.ProcessorResult;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
 import org.json.simple.JSONObject;
-import org.mockito.Matchers;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,12 +84,20 @@
 
     public ShimParserBolt(List<byte[]> output) {
       super(null
-           , sensorType == null?config.getSensorTopic():sensorType
-           , ReflectionUtils.createInstance(config.getParserClassName())
-           , new WriterHandler( new CollectingWriter(output))
+          , Collections.singletonMap(
+              sensorType == null ? config.getSensorTopic() : sensorType,
+              new ParserComponents(
+              ReflectionUtils.createInstance(config.getParserClassName()),
+                  null,
+                  new WriterHandler(new CollectingWriter(output))
+              )
+         )
       );
       this.output = output;
-      getParser().configure(config.getParserConfig());
+      Map<String, ParserComponents> sensorToComponentMap = getSensorToComponentMap();
+      for(Entry<String, ParserComponents> sensorToComponents : sensorToComponentMap.entrySet()) {
+        sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig());
+      }
     }
 
     @Override
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 7f40684..15b53b7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,6 +17,17 @@
  */
 package org.apache.metron.parsers.integration.components;
 
+import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
+import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.ZKServerComponent;
@@ -27,22 +38,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
-import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
-
 public class ParserTopologyComponent implements InMemoryComponent {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private Properties topologyProperties;
   private String brokerUrl;
-  private String sensorType;
+  private List<String> sensorTypes;
   private LocalCluster stormCluster;
   private String outputTopic;
   private String errorTopic;
@@ -51,7 +53,7 @@
 
     Properties topologyProperties;
     String brokerUrl;
-    String sensorType;
+    List<String> sensorTypes;
     String outputTopic;
     String errorTopic;
 
@@ -63,8 +65,8 @@
       this.brokerUrl = brokerUrl;
       return this;
     }
-    public Builder withSensorType(String sensorType) {
-      this.sensorType = sensorType;
+    public Builder withSensorTypes(List<String> sensorTypes) {
+      this.sensorTypes = sensorTypes;
       return this;
     }
 
@@ -80,7 +82,7 @@
 
     public ParserTopologyComponent build() {
 
-      if(sensorType == null) {
+      if(sensorTypes == null || sensorTypes.isEmpty()) {
         throw new IllegalArgumentException("The sensor type must be defined.");
       }
 
@@ -88,20 +90,20 @@
         throw new IllegalArgumentException("The output topic must be defined.");
       }
 
-      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic);
+      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorTypes, outputTopic, errorTopic);
     }
   }
 
-  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) {
+  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, List<String> sensorTypes, String outputTopic, String errorTopic) {
     this.topologyProperties = topologyProperties;
     this.brokerUrl = brokerUrl;
-    this.sensorType = sensorType;
+    this.sensorTypes = sensorTypes;
     this.outputTopic = outputTopic;
     this.errorTopic = errorTopic;
   }
 
-  public void updateSensorType(String sensorType) {
-    this.sensorType = sensorType;
+  public void updateSensorTypes(List<String> sensorTypes) {
+    this.sensorTypes = sensorTypes;
   }
 
   @Override
@@ -112,14 +114,14 @@
       ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build (
               topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY),
               Optional.ofNullable(brokerUrl),
-              sensorType,
+              sensorTypes,
+              (x,y) -> Collections.nCopies(sensorTypes.size(), 1),
+              (x,y) -> Collections.nCopies(sensorTypes.size(), 1),
               (x,y) -> 1,
               (x,y) -> 1,
               (x,y) -> 1,
               (x,y) -> 1,
-              (x,y) -> 1,
-              (x,y) -> 1,
-              (x,y) -> new HashMap<>(),
+              (x,y) -> Collections.nCopies(sensorTypes.size(), new HashMap<>()),
               (x,y) -> null,
               (x,y) -> outputTopic,
               (x,y) -> errorTopic,
@@ -131,9 +133,9 @@
       );
 
       stormCluster = new LocalCluster();
-      stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology());
+      stormCluster.submitTopology(getTopologyName(), stormConf, topologyBuilder.getBuilder().createTopology());
     } catch (Exception e) {
-      throw new UnableToStartException("Unable to start parser topology for sensorType: " + sensorType, e);
+      throw new UnableToStartException("Unable to start parser topology for sensorTypes: " + sensorTypes, e);
     }
   }
 
@@ -177,7 +179,7 @@
   protected void killTopology() {
     KillOptions ko = new KillOptions();
     ko.set_wait_secs(0);
-    stormCluster.killTopologyWithOpts(sensorType, ko);
+    stormCluster.killTopologyWithOpts(getTopologyName(), ko);
     try {
       // Actually wait for it to die.
       Thread.sleep(2000);
@@ -185,4 +187,8 @@
       // Do nothing
     }
   }
+
+  protected String getTopologyName() {
+    return StringUtils.join(sensorTypes, "__");
+  }
 }
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index fcfc93b..ae459f4 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -18,31 +18,34 @@
 
 package org.apache.metron.parsers.topology;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.Parser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Level;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.Config;
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.Reference;
-import java.util.*;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-
 public class ParserTopologyCLITest {
 
 
@@ -103,11 +106,11 @@
   public void kafkaOffset(boolean longOpt) throws ParseException {
     CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                       .build(longOpt);
     Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
     Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
-    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
+    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
   }
   @Test
   public void testCLI_happyPath() throws ParseException {
@@ -127,11 +130,11 @@
   public void happyPath(boolean longOpt) throws ParseException {
     CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                       .build(longOpt);
     Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
     Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
-    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
+    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
   }
 
   @Test
@@ -143,7 +146,7 @@
   public void testConfig_noExtra(boolean longOpt) throws ParseException {
    CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                      .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                     .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                     .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                      .with(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "1")
                                      .with(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "2")
                                      .with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3")
@@ -166,7 +169,7 @@
   public void testOutputTopic(boolean longOpt) throws ParseException {
      CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                       .with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic")
                                       .build(longOpt);
     Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli));
@@ -193,7 +196,7 @@
       FileUtils.write(extraFile, extraConfig);
       CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
               .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
               .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
               .with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraFile.getAbsolutePath())
               .build(longOpt);
@@ -208,50 +211,50 @@
   }
 
   private static class ParserInput {
-    private Integer spoutParallelism;
-    private Integer spoutNumTasks;
+    private List<Integer> spoutParallelism;
+    private List<Integer> spoutNumTasks;
     private Integer parserParallelism;
     private Integer parserNumTasks;
     private Integer errorParallelism;
     private Integer errorNumTasks;
-    private Map<String, Object> spoutConfig;
+    private List<Map<String, Object>> spoutConfig;
     private String securityProtocol;
     private Config stormConf;
     private String outputTopic;
     private String errorTopic;
 
-    public ParserInput(ValueSupplier<Integer> spoutParallelism,
-                       ValueSupplier<Integer> spoutNumTasks,
+    public ParserInput(ValueSupplier<List> spoutParallelism,
+                       ValueSupplier<List> spoutNumTasks,
                        ValueSupplier<Integer> parserParallelism,
                        ValueSupplier<Integer> parserNumTasks,
                        ValueSupplier<Integer> errorParallelism,
                        ValueSupplier<Integer> errorNumTasks,
-                       ValueSupplier<Map> spoutConfig,
+                       ValueSupplier<List> spoutConfig,
                        ValueSupplier<String> securityProtocol,
                        ValueSupplier<Config> stormConf,
                        ValueSupplier<String> outputTopic,
                        ValueSupplier<String> errorTopic,
-                       SensorParserConfig config
+                       List<SensorParserConfig> configs
                       )
     {
-      this.spoutParallelism = spoutParallelism.get(config, Integer.class);
-      this.spoutNumTasks = spoutNumTasks.get(config, Integer.class);
-      this.parserParallelism = parserParallelism.get(config, Integer.class);
-      this.parserNumTasks = parserNumTasks.get(config, Integer.class);
-      this.errorParallelism = errorParallelism.get(config, Integer.class);
-      this.errorNumTasks = errorNumTasks.get(config, Integer.class);
-      this.spoutConfig = spoutConfig.get(config, Map.class);
-      this.securityProtocol = securityProtocol.get(config, String.class);
-      this.stormConf = stormConf.get(config, Config.class);
-      this.outputTopic = outputTopic.get(config, String.class);
-      this.errorTopic = outputTopic.get(config, String.class);
+      this.spoutParallelism = spoutParallelism.get(configs, List.class);
+      this.spoutNumTasks = spoutNumTasks.get(configs, List.class);
+      this.parserParallelism = parserParallelism.get(configs, Integer.class);
+      this.parserNumTasks = parserNumTasks.get(configs, Integer.class);
+      this.errorParallelism = errorParallelism.get(configs, Integer.class);
+      this.errorNumTasks = errorNumTasks.get(configs, Integer.class);
+      this.spoutConfig = spoutConfig.get(configs, List.class);
+      this.securityProtocol = securityProtocol.get(configs, String.class);
+      this.stormConf = stormConf.get(configs, Config.class);
+      this.outputTopic = outputTopic.get(configs, String.class);
+      this.errorTopic = errorTopic.get(configs, String.class);
     }
 
-    public Integer getSpoutParallelism() {
+    public List<Integer> getSpoutParallelism() {
       return spoutParallelism;
     }
 
-    public Integer getSpoutNumTasks() {
+    public List<Integer> getSpoutNumTasks() {
       return spoutNumTasks;
     }
 
@@ -271,7 +274,7 @@
       return errorNumTasks;
     }
 
-    public Map<String, Object> getSpoutConfig() {
+    public List<Map<String, Object>> getSpoutConfig() {
       return spoutConfig;
     }
 
@@ -330,43 +333,116 @@
   @Test
   public void testSpoutParallelism() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
-                    , "10"
-                    , input -> input.getSpoutParallelism().equals(10)
-                    , () -> {
-                      SensorParserConfig config = getBaseConfig();
-                      config.setSpoutParallelism(20);
-                      return config;
-                    }
-                    , input -> input.getSpoutParallelism().equals(20)
-                    );
+        , "10"
+        , input -> input.getSpoutParallelism().equals(Collections.singletonList(10))
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSpoutParallelism(20);
+          return Collections.singletonList(config);
+        }
+        , input -> input.getSpoutParallelism().equals(Collections.singletonList(20))
+    );
+  }
+
+  @Test
+  public void testSpoutParallelismMultiple() throws Exception {
+    // Each spout uses it's own
+    // Return one per spout.
+    List<Integer> spoutParCli = new ArrayList<>();
+    spoutParCli.add(10);
+    spoutParCli.add(12);
+    List<Integer> spoutParConfig = new ArrayList<>();
+    spoutParConfig.add(20);
+    spoutParConfig.add(30);
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
+        , "10,12"
+        , input -> input.getSpoutParallelism().equals(spoutParCli)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSpoutParallelism(20);
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSpoutParallelism(30);
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getSpoutParallelism().equals(spoutParConfig)
+    );
   }
 
   @Test
   public void testSpoutNumTasks() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
                     , "10"
-                    , input -> input.getSpoutNumTasks().equals(10)
+                    , input -> input.getSpoutNumTasks().equals(Collections.singletonList(10))
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setSpoutNumTasks(20);
-                      return config;
+                      return Collections.singletonList(config);
                     }
-                    , input -> input.getSpoutNumTasks().equals(20)
+                    , input -> input.getSpoutNumTasks().equals(Collections.singletonList(20))
                     );
   }
 
   @Test
+  public void testSpoutNumTasksMultiple() throws Exception {
+    // Return one per spout.
+    List<Integer> numTasksCli = new ArrayList<>();
+    numTasksCli.add(10);
+    numTasksCli.add(12);
+    List<Integer> numTasksConfig = new ArrayList<>();
+    numTasksConfig.add(20);
+    numTasksConfig.add(30);
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
+        , "10,12"
+        , input -> input.getSpoutNumTasks().equals(numTasksCli)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSpoutNumTasks(20);
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSpoutNumTasks(30);
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getSpoutNumTasks().equals(numTasksConfig)
+    );
+  }
+
+  @Test
   public void testParserParallelism() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
-                    , "10"
-                    , input -> input.getParserParallelism().equals(10)
-                    , () -> {
-                      SensorParserConfig config = getBaseConfig();
-                      config.setParserParallelism(20);
-                      return config;
-                    }
-                    , input -> input.getParserParallelism().equals(20)
-                    );
+        , "10"
+        , input -> input.getParserParallelism().equals(10)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setParserParallelism(20);
+          return Collections.singletonList(config);
+        }
+        , input -> input.getParserParallelism().equals(20)
+    );
+  }
+
+  @Test
+  public void testParserParallelismMultiple() throws Exception {
+    // Last one wins
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
+        , "10"
+        , input -> input.getParserParallelism().equals(10)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setParserParallelism(20);
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setParserParallelism(30);
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getParserParallelism().equals(30)
+    );
   }
 
   @Test
@@ -377,13 +453,32 @@
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setParserNumTasks(20);
-                      return config;
+                      SensorParserConfig config2 = getBaseConfig();
+                      config2.setParserNumTasks(30);
+                      List<SensorParserConfig> configs = new ArrayList<>();
+                      configs.add(config);
+                      configs.add(config2);
+                      return configs;
                     }
-                    , input -> input.getParserNumTasks().equals(20)
+                    , input -> input.getParserNumTasks().equals(30)
                     );
   }
 
   @Test
+  public void testParserNumTasksMultiple() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS
+        , "10"
+        , input -> input.getParserNumTasks().equals(10)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setParserNumTasks(20);
+          return Collections.singletonList(config);
+        }
+        , input -> input.getParserNumTasks().equals(20)
+    );
+  }
+
+  @Test
   public void testErrorParallelism() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM
                     , "10"
@@ -391,7 +486,7 @@
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setErrorWriterParallelism(20);
-                      return config;
+                      return Collections.singletonList(config);
                     }
                     , input -> input.getErrorParallelism().equals(20)
                     );
@@ -405,7 +500,7 @@
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setErrorWriterNumTasks(20);
-                      return config;
+                      return Collections.singletonList(config);
                     }
                     , input -> input.getErrorNumTasks().equals(20)
                     );
@@ -419,13 +514,55 @@
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setSecurityProtocol("KERBEROS");
-                      return config;
+                      return Collections.singletonList(config);
                     }
                     , input -> input.getSecurityProtocol().equals("KERBEROS")
                     );
   }
 
   @Test
+  public void testSecurityProtocol_fromCLIMultipleUniform() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+        , "PLAINTEXT"
+        , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSecurityProtocol("PLAINTEXT");
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSecurityProtocol("PLAINTEXT");
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+    );
+  }
+
+  @Test
+  public void testSecurityProtocol_fromCLIMultipleMixed() throws Exception {
+    // Non plaintext wins
+    testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+        , "PLAINTEXT"
+        , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSecurityProtocol("PLAINTEXT");
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSecurityProtocol("KERBEROS");
+          SensorParserConfig config3 = getBaseConfig();
+          config3.setSecurityProtocol("PLAINTEXT");
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          configs.add(config3);
+          return configs;
+        }
+        , input -> input.getSecurityProtocol().equals("KERBEROS")
+    );
+  }
+
+  @Test
   public void testSecurityProtocol_fromSpout() throws Exception {
     //Ultimately the order of precedence is CLI > spout config > parser config
     File extraConfig = File.createTempFile("spoutConfig", "json");
@@ -444,7 +581,7 @@
               , () -> {
                 SensorParserConfig config = getBaseConfig();
                 config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
-                return config;
+                return Collections.singletonList(config);
               }
               , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
       );
@@ -458,7 +595,7 @@
               , () -> {
                 SensorParserConfig config = getBaseConfig();
                 config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
-                return config;
+                return Collections.singletonList(config);
               }
               , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
       );
@@ -481,7 +618,7 @@
                         SensorParserConfig config = getBaseConfig();
                         config.setNumWorkers(100);
                         config.setNumAckers(200);
-                        return config;
+                        return Collections.singletonList(config);
                               }
                       , input -> {
                           Config c = input.getStormConf();
@@ -519,7 +656,7 @@
                             put(Config.TOPOLOGY_ACKER_EXECUTORS, 200);
                           }}
                                              );
-                        return config;
+                        return Collections.singletonList(config);
                               }
                       , input -> {
                           Config c = input.getStormConf();
@@ -542,22 +679,21 @@
                       put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
                     }};
     Predicate<ParserInput> cliOverrideExpected = input -> {
-      return input.getSpoutConfig().get("extra_config").equals("from_file");
+      return input.getSpoutConfig().get(0).get("extra_config").equals("from_file");
     };
 
     Predicate<ParserInput> configOverrideExpected = input -> {
-      return input.getSpoutConfig().get("extra_config").equals("from_zk")
-                                  ;
+      return input.getSpoutConfig().get(0).get("extra_config").equals("from_zk");
     };
 
-    Supplier<SensorParserConfig> configSupplier = () -> {
+    Supplier<List<SensorParserConfig>> configSupplier = () -> {
       SensorParserConfig config = getBaseConfig();
       config.setSpoutConfig(
               new HashMap<String, Object>() {{
                 put("extra_config", "from_zk");
               }}
       );
-      return config;
+      return Collections.singletonList(config);
     };
     testConfigOption( cliOptions
                     , cliOverrideExpected
@@ -573,7 +709,7 @@
   private void testConfigOption( ParserTopologyCLI.ParserOptions option
                                , String cliOverride
                                , Predicate<ParserInput> cliOverrideCondition
-                               , Supplier<SensorParserConfig> configSupplier
+                               , Supplier<List<SensorParserConfig>> configSupplier
                                , Predicate<ParserInput> configOverrideCondition
   ) throws Exception {
     testConfigOption(
@@ -588,48 +724,48 @@
 
   private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, String> options
                                , Predicate<ParserInput> cliOverrideCondition
-                               , Supplier<SensorParserConfig> configSupplier
+                               , Supplier<List<SensorParserConfig>> configSupplier
                                , Predicate<ParserInput> configOverrideCondition
   ) throws Exception {
     //CLI Override
-    SensorParserConfig config = configSupplier.get();
+    List<SensorParserConfig> configs = configSupplier.get();
     {
       CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
               .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
       for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : options.entrySet()) {
         builder.with(entry.getKey(), entry.getValue());
       }
       CommandLine cmd = builder.build(true);
-      ParserInput input = getInput(cmd, config);
+      ParserInput input = getInput(cmd, configs);
       Assert.assertTrue(cliOverrideCondition.test(input));
     }
     // Config Override
     {
       CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
               .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
       CommandLine cmd = builder.build(true);
-      ParserInput input = getInput(cmd, config);
+      ParserInput input = getInput(cmd, configs);
       Assert.assertTrue(configOverrideCondition.test(input));
     }
   }
 
-  private static ParserInput getInput(CommandLine cmd, SensorParserConfig config ) throws Exception {
+  private static ParserInput getInput(CommandLine cmd, List<SensorParserConfig> configs ) throws Exception {
     final ParserInput[] parserInput = new ParserInput[]{null};
     new ParserTopologyCLI() {
       @Override
       protected ParserTopologyBuilder.ParserTopology getParserTopology(
               String zookeeperUrl,
               Optional<String> brokerUrl,
-              String sensorType,
-              ValueSupplier<Integer> spoutParallelism,
-              ValueSupplier<Integer> spoutNumTasks,
+              List<String> sensorType,
+              ValueSupplier<List> spoutParallelism,
+              ValueSupplier<List> spoutNumTasks,
               ValueSupplier<Integer> parserParallelism,
               ValueSupplier<Integer> parserNumTasks,
               ValueSupplier<Integer> errorParallelism,
               ValueSupplier<Integer> errorNumTasks,
-              ValueSupplier<Map> spoutConfig,
+              ValueSupplier<List> spoutConfig,
               ValueSupplier<String> securityProtocol,
               ValueSupplier<Config> stormConf,
               ValueSupplier<String> outputTopic,
@@ -647,7 +783,7 @@
                stormConf,
                outputTopic,
                errorTopic,
-               config
+               configs
        );
 
         return null;
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index 49d7521..788df2d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.metron.writers.integration;
 
 import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -107,7 +108,7 @@
             .withParserSensorConfig(sensorType, parserConfig);
 
     ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
-            .withSensorType(sensorType)
+            .withSensorTypes(Collections.singletonList(sensorType))
             .withTopologyProperties(topologyProperties)
             .withBrokerUrl(kafkaComponent.getBrokerList())
             .withOutputTopic(parserConfig.getOutputTopic())
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index 99506de..cecba3d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -27,6 +27,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -95,6 +96,7 @@
    *    "sensorTopic": "dummy",
    *    "outputTopic": "output",
    *    "errorTopic": "parser_error",
+   *    "readMetadata": true,
    *    "parserConfig": {
    *        "batchSize" : 1,
    *        "columns" : {
@@ -148,7 +150,12 @@
     }};
 
     final Properties topologyProperties = new Properties();
-    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
+    ComponentRunner runner = setupTopologyComponents(
+        topologyProperties,
+        Collections.singletonList(sensorType),
+        Collections.singletonList(parserConfig),
+        globalConfigWithValidation
+    );
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -172,7 +179,7 @@
 
   @Test
   public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception {
-    final String sensorType = "dummy";
+   final String sensorType = "dummy";
     SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("valid,foo"));
@@ -181,7 +188,8 @@
     }};
 
     final Properties topologyProperties = new Properties();
-    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, Collections.singletonList(sensorType),
+        Collections.singletonList(parserConfig), globalConfigWithValidation);
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -223,27 +231,31 @@
    *
    * @return runner
    */
-  public ComponentRunner setupTopologyComponents(Properties topologyProperties, String sensorType,
-      SensorParserConfig parserConfig, String globalConfig) {
+  public ComponentRunner setupTopologyComponents(Properties topologyProperties, List<String> sensorTypes,
+      List<SensorParserConfig> parserConfigs, String globalConfig) {
     zkServerComponent = getZKServerComponent(topologyProperties);
-    kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
-      add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
-      add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-    }});
+    List<KafkaComponent.Topic> topics = new ArrayList<>();
+    for(String sensorType : sensorTypes) {
+      topics.add(new KafkaComponent.Topic(sensorType, 1));
+    }
+    topics.add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    kafkaComponent = getKafkaComponent(topologyProperties, topics);
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
 
     configUploadComponent = new ConfigUploadComponent()
         .withTopologyProperties(topologyProperties)
-        .withGlobalConfig(globalConfig)
-        .withParserSensorConfig(sensorType, parserConfig);
+        .withGlobalConfig(globalConfig);
+
+    for (int i = 0; i < sensorTypes.size(); ++i) {
+      configUploadComponent.withParserSensorConfig(sensorTypes.get(i), parserConfigs.get(i));
+    }
 
     parserTopologyComponent = new ParserTopologyComponent.Builder()
-        .withSensorType(sensorType)
+        .withSensorTypes(sensorTypes)
         .withTopologyProperties(topologyProperties)
         .withBrokerUrl(kafkaComponent.getBrokerList())
-        .withErrorTopic(parserConfig.getErrorTopic())
-        .withOutputTopic(parserConfig.getOutputTopic())
+        .withErrorTopic(parserConfigs.get(0).getErrorTopic())
+        .withOutputTopic(parserConfigs.get(0).getOutputTopic())
         .build();
 
     return new ComponentRunner.Builder()
@@ -325,8 +337,22 @@
   @Multiline
   public static String offsetParserConfigJSON;
 
+  /**
+   * {
+   *    "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$DummyObjectParser",
+   *    "sensorTopic":"dummyobjectparser",
+   *    "outputTopic": "enrichments",
+   *    "errorTopic": "parser_error",
+   *    "parserConfig": {
+   *        "batchSize" : 1
+   *    }
+   * }
+   */
+  @Multiline
+  public static String dummyParserConfigJSON;
+
   @Test
-  public void commits_kafka_offsets_for_emtpy_objects() throws Exception {
+  public void commits_kafka_offsets_for_empty_objects() throws Exception {
     final String sensorType = "emptyobjectparser";
     SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
@@ -335,7 +361,11 @@
       add(Bytes.toBytes("baz"));
     }};
     final Properties topologyProperties = new Properties();
-    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigEmpty);
+    ComponentRunner runner = setupTopologyComponents(
+        topologyProperties,
+        Collections.singletonList(sensorType),
+        Collections.singletonList(parserConfig),
+        globalConfigEmpty);
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -356,6 +386,64 @@
     }
   }
 
+  @Test
+  public void test_multiple_sensors() throws Exception {
+    // Setup first sensor
+    final String emptyObjectSensorType = "emptyobjectparser";
+    SensorParserConfig emptyObjectParserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
+    final List<byte[]> emptyObjectInputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("foo"));
+      add(Bytes.toBytes("bar"));
+      add(Bytes.toBytes("baz"));
+    }};
+
+    // Setup second sensor
+    final String dummySensorType = "dummyobjectparser";
+    SensorParserConfig dummyParserConfig = JSONUtils.INSTANCE.load(dummyParserConfigJSON, SensorParserConfig.class);
+    final List<byte[]> dummyInputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("dummy_foo"));
+      add(Bytes.toBytes("dummy_bar"));
+      add(Bytes.toBytes("dummy_baz"));
+    }};
+
+    final Properties topologyProperties = new Properties();
+
+    List<String> sensorTypes = new ArrayList<>();
+    sensorTypes.add(emptyObjectSensorType);
+    sensorTypes.add(dummySensorType);
+
+    List<SensorParserConfig> parserConfigs = new ArrayList<>();
+    parserConfigs.add(emptyObjectParserConfig);
+    parserConfigs.add(dummyParserConfig);
+
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorTypes, parserConfigs, globalConfigEmpty);
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(emptyObjectSensorType, emptyObjectInputMessages);
+      kafkaComponent.writeMessages(dummySensorType, dummyInputMessages);
+
+      final List<byte[]> allInputMessages = new ArrayList<>();
+      allInputMessages.addAll(emptyObjectInputMessages);
+      allInputMessages.addAll(dummyInputMessages);
+      Processor allResultsProcessor = new AllResultsProcessor(allInputMessages, Constants.ENRICHMENT_TOPIC);
+      @SuppressWarnings("unchecked")
+      ProcessorResult<Set<JSONObject>> result = runner.process(allResultsProcessor);
+
+      // validate the output messages
+      assertThat(
+          "size should match",
+          result.getResult().size(),
+          equalTo(allInputMessages.size()));
+      for (JSONObject record : result.getResult()) {
+        assertThat("record should have a guid", record.containsKey("guid"), equalTo(true));
+      }
+    } finally {
+      if (runner != null) {
+        runner.stop();
+      }
+    }
+  }
+
   /**
    * Goal is to check returning an empty JSONObject in our List returned by parse.
    */
@@ -380,6 +468,34 @@
     }
   }
 
+
+  /**
+   * Goal is to check returning an empty JSONObject in our List returned by parse.
+   */
+  public static class DummyObjectParser implements MessageParser<JSONObject>, Serializable {
+
+    @Override
+    public void init() {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public List<JSONObject> parse(byte[] bytes) {
+      JSONObject dummy = new JSONObject();
+      dummy.put("dummy_key", "dummy_value");
+      return ImmutableList.of(dummy);
+    }
+
+    @Override
+    public boolean validate(JSONObject message) {
+      return true;
+    }
+
+    @Override
+    public void configure(Map<String, Object> map) {
+    }
+  }
+
   /**
    * Verifies all messages in the provided List of input messages appears in the specified
    * Kafka output topic
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index e35960f..7678584 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -118,7 +119,7 @@
   public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) {
     LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e);
     MetronError error = new MetronError()
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR)
             .withThrowable(e);
     tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 0264b3d..c389854 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -17,11 +17,22 @@
  */
 package org.apache.metron.writer;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
@@ -38,19 +49,6 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.verifyStatic;
-
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({BulkWriterComponent.class, ErrorUtils.class})
 public class BulkWriterComponentTest {
@@ -130,7 +128,7 @@
   public void writeShouldProperlyHandleWriterErrors() throws Exception {
     Throwable e = new Exception("test exception");
     MetronError error = new MetronError()
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
@@ -164,7 +162,7 @@
   public void writeShouldProperlyHandleWriterException() throws Exception {
     Throwable e = new Exception("test exception");
     MetronError error = new MetronError()
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
@@ -183,10 +181,10 @@
   public void errorAllShouldClearMapsAndHandleErrors() throws Exception {
     Throwable e = new Exception("test exception");
     MetronError error1 = new MetronError()
-            .withSensorType("sensor1")
+            .withSensorType(Collections.singleton("sensor1"))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
     MetronError error2 = new MetronError()
-            .withSensorType("sensor2")
+            .withSensorType(Collections.singleton("sensor2"))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
 
     BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
diff --git a/use-cases/parser_chaining/README.md b/use-cases/parser_chaining/README.md
index 26fd333..4055bcd 100644
--- a/use-cases/parser_chaining/README.md
+++ b/use-cases/parser_chaining/README.md
@@ -233,3 +233,17 @@
 ```
 
 You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
+
+# Aggregated Parsers with Parser Chaining
+Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
+
+Instead of creating a topology per sensor, all 3 (`pix-syslog-parser`, `cisco-5-304`, and `cisco-6-302`) can be run in a single aggregated parser. It's also possible to aggregate a subset of these parsers (e.g. run `cisco-6-302` as it's own topology, and aggregate the other 2).
+
+The step to start parsers then becomes
+```
+$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-6-302,cisco-5-304,pix_syslog_router
+```
+
+The flow through the Storm topology and Kafka topics:
+
+![Aggregated Flow](aggregated_parser_chaining_flow.svg)
\ No newline at end of file
diff --git a/use-cases/parser_chaining/aggregated_parser_chaining_flow.svg b/use-cases/parser_chaining/aggregated_parser_chaining_flow.svg
new file mode 100644
index 0000000..28529c8
--- /dev/null
+++ b/use-cases/parser_chaining/aggregated_parser_chaining_flow.svg
@@ -0,0 +1,14 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="787px" height="967px" version="1.1" content="&lt;mxfile userAgent=&quot;Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36&quot; version=&quot;8.9.5&quot; editor=&quot;www.draw.io&quot; type=&quot;device&quot;&gt;&lt;diagram name=&quot;Page-1&quot; id=&quot;c7558073-3199-34d8-9f00-42111426c3f3&quot;&gt;7Vtbb6M6EP41eewKMBDy2Nt2pXOr1COd3acVAYdYJRiB06b7648dbMAXEppCk27al+KxMXi+b8YzYzIB16vNXRHmy79wDNOJY8WbCbiZOI5tzyz6j0leKokf2JUgKVDMBzWCB/QLciG/L1mjGJbSQIJxSlAuCyOcZTAikiwsCvwsD1vgVH5qHiZQEzxEYapL/0MxWVbSwJk28m8QJUvxZMviLz4Po8ekwOuMP2/igMX2r+pehWIuPr5chjF+bonA7QRcFxiT6mq1uYYp061QW3Xf147e+r0LmJE+N4DqhqcwXfOl/42+Iv5u5EXo43mJCHzIw4i1nynmE3AVpijJaDOij4IFFTzBgiCqw0vesUJxzCa4WuCMPPDJLNouSYEfa8UCNgKl6TVOMZ3mJsMZu0lfCV8cewzctER8ZXcQryApXugQ3gu86g5OQsB1/twgKni6bIHpc1nIOZTU8zZ6pBdclR04uJpec7T5Wb6UKU5+Un4wjTk+ZYfNFPJAcLFiGs9pT0t+CjiU9GkoS6jAHggV1/2yFxgbTEdCRliqGZoSZiVm0KT0mVdzdpWwqz/CxWO49UI5ijRYqBXn7DJ6SVEWb0EwITVnfgHGf85Trs3GW/yzJvRWyOVpOIfpVd2pIGIAaUlWYs4KVNFPmWRt/4xwd0BnALgTTdv3JSxtR8fSNljZzB8CS93MNGxgTD07b3JtZfi2JXyt0uAGke/8Pnb9g1kTZTRtZXQB37lxbRtNn6DIIoWbS7ZD1epnL7hb+XQ9eF1E9R7J1UnCIoFinGtGqQ2DZYChFhYwDQl6kt/EBA5/xj1G9B0boxaelNNgqphqtQB+U3s/Uudx5Xl8IM9TrVmbZ8uUetk9yeMfgTzMBRtHDkYPYCCHZU2Oyg7bkR2+6sj7sqMOm5SNYwR2CD+2ix0mH9/yxGGZV7HpAm1gvNet99qNu/gzgCevbU14cuHZWywBJpbUwjf58jqQ2KFwk5XALOaXN1EaliXdniUU2kpsovFdVtul48a9N97+h/D2fFPgXXxbsI2wHuj2XYNhVyzdadieATFvIKevWKNqjn3NOpDnqdsjmLUh4RHBHg10MxHuRaiM8IV3AViEUXXTqdsjPkPE/iGiZ0v4eo4h3h8tRnRMMWIX4j5F3PlE/K2IewFQUrx3xtzTMJct+qMm3m+zQzCTo7DgPfNuRw+3ZZs7S0y8qXVMTKa/Q8ylJ+KGpL2Vs+GCLHGCszBtZXKHRmVeYAjLnP3Z+Ihhmafk0K4an/eNy7SJxgzMgpOnogjxB6HbLmIPSkVwTCrW84gUYabUc3pn/kqOUMcTY1BxdipU7F8/Oiw3PZBoxhpTFXOdSCoa2MOkolN1mx2QZqBHhem8aWaqeFRh5NH82UwpZDpKutLbn6kTzUYkmh7lwaxA0XJF1V9+prW9g3UgdljhZUzB+mhJLdC3JUPRooHxjmqgWl7JDpstmk3l24RKGachTBVEZD0XsES/wnmdK+WMltu1eFcT74blXWuCy+ozDruVhqVwQQxJGGVWk0z9yxo3F+4OtzMAdq6SaE0DT8fOMWCnFjUP+xxAP3SWChLWCpZlmECDNZ4HPJ7yRYBnsCzXgI4aBh6Gjn70IFnW2aPjK2HRO8OjV/Muk6SASUhgTOWiYHQvPN0cp6QHMPLu0b3h9K8pmTbCotoIOfh7drvxIFT3LjvQT/eMe9cwEHYV/6Qjls69a6/V/WYnsVMZK+AGGlaBASq1PHMYVHq0+Jow48ygcv1jQnUyNbNXZJA9vp6Sc0t7zPKtOCmSv6U6ao4J7JlyeOb7B2aZwNemspWpBswz3Y9YN+ss6h6Hjr6JjtNj0tENbJVDakjQ+zwBqFN56neCA9LR6/Fx6MnRsSGdLRfYvlh1+x4WiCqChaJ7zsFG46lr4ik4Kk+VCrBteQeyNFAnUo/PhuSontp8JI4Gr+aowc+OxtHpJ0cH4ejbcrczrzuqJ6DvW3f09AzB9Dukz5SuXSnxHQkxxzFUSixHR6wWvgIy2mx+7FdZZ/OLSnD7Pw==&lt;/diagram&gt;&lt;/mxfile&gt;" style="background-color: rgb(255, 255, 255);"><defs/><g transform="translate(0.5,0.5)"><rect x="1" y="19" width="90" height="60" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><g transform="translate(34.5,42.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="22" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 22px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">NiFi</div></div></foreignObject><text x="11" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">NiFi</text></switch></g><rect x="310.5" y="19" width="137" height="60" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><g transform="translate(331.5,28.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="95" height="40" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 95px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">pix_syslog_router<br />Storm spout<div><br /></div></div></div></foreignObject><text x="48" y="26" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">[Not supported by viewer]</text></switch></g><path d="M 132 20 C 132 39 242 39 242 20" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><path d="M 132 20 C 132 -5.33 242 -5.33 242 20 L 242 78 C 242 103.33 132 103.33 132 78 Z" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><g transform="translate(137.5,49.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="98" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 100px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">pix_syslog_sensor<br />Kafka topic</div></div></foreignObject><text x="49" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">pix_syslog_sensor&lt;br&gt;Kafka topic</text></switch></g><path d="M 243.5 55 L 243.5 43 L 288.5 43 L 288.5 31.5 L 309.5 49 L 288.5 66.5 L 288.5 55 Z" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 92.5 55 L 92.5 43 L 109.5 43 L 109.5 31.5 L 130.5 49 L 109.5 66.5 L 109.5 55 Z" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><rect x="229" y="155" width="300" height="300" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><path d="M 373 80.5 L 385 80.5 L 385 132.5 L 396.5 132.5 L 379 153.5 L 361.5 132.5 L 373 132.5 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 117 528 C 117 547 227 547 227 528" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><path d="M 117 528 C 117 502.67 227 502.67 227 528 L 227 586 C 227 611.33 117 611.33 117 586 Z" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><g transform="translate(140.5,557.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="62" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 64px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;"><span>cisco-5-304</span><br />Kafka topic</div></div></foreignObject><text x="31" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">[Not supported by viewer]</text></switch></g><path d="M 549.5 528 C 549.5 547 659.5 547 659.5 528" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><path d="M 549.5 528 C 549.5 502.67 659.5 502.67 659.5 528 L 659.5 586 C 659.5 611.33 549.5 611.33 549.5 586 Z" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><g transform="translate(573.5,557.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="62" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 64px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;"><span>cisco-6-302</span><br />Kafka topic</div></div></foreignObject><text x="31" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">[Not supported by viewer]</text></switch></g><rect x="105" y="669" width="137" height="60" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><g transform="translate(140.5,678.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="65" height="40" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 65px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-5-304<br />Storm spout<div><br /></div></div></div></foreignObject><text x="33" y="26" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">[Not supported by viewer]</text></switch></g><rect x="536" y="669" width="137" height="60" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><g transform="translate(571.5,678.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="65" height="40" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 65px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-6-302<br />Storm spout<div><br /></div></div></div></foreignObject><text x="33" y="26" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">[Not supported by viewer]</text></switch></g><path d="M 431.5 272 L 431.5 260 L 599 260 Q 611 260 611 272 L 611 486.5 L 622.5 486.5 L 605 507.5 L 587.5 486.5 L 599 486.5 L 599 272 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 611 486.5 L 622.5 486.5 L 605 507.5 L 587.5 486.5 L 599 486.5" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="flat" stroke-miterlimit="4" pointer-events="none"/><path d="M 326.5 260 L 326.5 272 L 178 272 L 178 486.5 L 189.5 486.5 L 172 507.5 L 154.5 486.5 L 166 486.5 L 166 272 Q 166 260 178 260 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 178 486.5 L 189.5 486.5 L 172 507.5 L 154.5 486.5 L 166 486.5" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="flat" stroke-miterlimit="4" pointer-events="none"/><path d="M 166.05 606.69 L 178.04 606.31 L 179.29 646.32 L 190.79 645.96 L 173.95 667.5 L 155.81 647.06 L 167.3 646.7 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 599 606.5 L 611 606.5 L 611 646.5 L 622.5 646.5 L 605 667.5 L 587.5 646.5 L 599 646.5 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 324 888 C 324 907 434 907 434 888" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><path d="M 324 888 C 324 862.67 434 862.67 434 888 L 434 946 C 434 971.33 324 971.33 324 946 Z" fill="none" stroke="#000000" stroke-width="3" stroke-miterlimit="10" pointer-events="none"/><g transform="translate(345.5,917.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="66" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 66px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">enrichments<br />Kafka topic</div></div></foreignObject><text x="33" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">enrichments&lt;br&gt;Kafka topic</text></switch></g><g transform="translate(437.5,776.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="100" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-6-302<br />Grok parser output<br /></div></div></foreignObject><text x="50" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">cisco-6-302&lt;br&gt;Grok parser output&lt;br&gt;</text></switch></g><g transform="translate(22.5,341.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="120" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-5-304 messages<br /></div></div></foreignObject><text x="60" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">cisco-5-304 messages&lt;br&gt;</text></switch></g><g transform="translate(647.5,341.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="120" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-6-302 messages<br /></div></div></foreignObject><text x="60" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">cisco-6-302 messages&lt;br&gt;</text></switch></g><g transform="translate(324.5,171.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="108" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 108px; white-space: normal; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">Aggregated Storm Parser bolt</div></div></foreignObject><text x="54" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">Aggregated Storm Parser bolt</text></switch></g><rect x="244" y="337" width="80" height="80" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><g transform="translate(251.5,363.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="64" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 64px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-5-304<br />Grok parser<br /></div></div></foreignObject><text x="32" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">[Not supported by viewer]</text></switch></g><rect x="434" y="337" width="80" height="80" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><g transform="translate(441.5,363.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="64" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 64px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-6-302<br />Grok parser<br /></div></div></foreignObject><text x="32" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">[Not supported by viewer]</text></switch></g><path d="M 243.5 705 L 243.5 693 L 278 693 L 278 439.5 L 266.5 439.5 L 284 418.5 L 301.5 439.5 L 290 439.5 L 290 693 Q 290 705 278 705 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 278 439.5 L 266.5 439.5 L 284 418.5 L 301.5 439.5 L 290 439.5" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="flat" stroke-miterlimit="4" pointer-events="none"/><path d="M 534.5 693 L 534.5 705 L 480 705 Q 468 705 468 693 L 468 439.5 L 456.5 439.5 L 474 418.5 L 491.5 439.5 L 480 439.5 L 480 693 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 468 439.5 L 456.5 439.5 L 474 418.5 L 491.5 439.5 L 480 439.5" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="flat" stroke-miterlimit="4" pointer-events="none"/><path d="M 325.5 383 L 325.5 371 L 335 371 Q 347 371 347 383 L 347 851.5 L 358.5 851.5 L 341 872.5 L 323.5 851.5 L 335 851.5 L 335 383 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 347 851.5 L 358.5 851.5 L 341 872.5 L 323.5 851.5 L 335 851.5" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="flat" stroke-miterlimit="4" pointer-events="none"/><path d="M 432.5 371 L 432.5 383 L 424 383 L 424 851.5 L 435.5 851.5 L 418 872.5 L 400.5 851.5 L 412 851.5 L 412 383 Q 412 371 424 371 Z" fill="#ffffff" stroke="#000000" stroke-width="3" stroke-linejoin="round" stroke-miterlimit="10" pointer-events="none"/><path d="M 424 851.5 L 435.5 851.5 L 418 872.5 L 400.5 851.5 L 412 851.5" fill="none" stroke="#000000" stroke-width="3" stroke-linejoin="flat" stroke-miterlimit="4" pointer-events="none"/><g transform="translate(217.5,776.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="100" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">cisco-5-304<br />Grok parser output<br /></div></div></foreignObject><text x="50" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">cisco-5-304&lt;br&gt;Grok parser output&lt;br&gt;</text></switch></g><rect x="328" y="215" width="102" height="102" fill="none" stroke="#000000" stroke-width="3" pointer-events="none"/><g transform="translate(331.5,252.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="94" height="26" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Helvetica; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 96px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">pix_syslog_router<br />Grok parser<br /></div></div></foreignObject><text x="47" y="19" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">pix_syslog_router&lt;br&gt;Grok parser&lt;br&gt;</text></switch></g></g></svg>
\ No newline at end of file
diff --git a/use-cases/parser_chaining/aggregated_parser_chaining_flow.xml b/use-cases/parser_chaining/aggregated_parser_chaining_flow.xml
new file mode 100644
index 0000000..22cda9f
--- /dev/null
+++ b/use-cases/parser_chaining/aggregated_parser_chaining_flow.xml
@@ -0,0 +1,14 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+<!-- This is a draw.io diagram.  You can load it from http://www.draw.io -->
+<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36" version="8.9.5" editor="www.draw.io" type="device"><diagram name="Page-1" id="c7558073-3199-34d8-9f00-42111426c3f3">7Vtbb6M6EP41eewKMBDy2Nt2pXOr1COd3acVAYdYJRiB06b7648dbMAXEppCk27al+KxMXi+b8YzYzIB16vNXRHmy79wDNOJY8WbCbiZOI5tzyz6j0leKokf2JUgKVDMBzWCB/QLciG/L1mjGJbSQIJxSlAuCyOcZTAikiwsCvwsD1vgVH5qHiZQEzxEYapL/0MxWVbSwJk28m8QJUvxZMviLz4Po8ekwOuMP2/igMX2r+pehWIuPr5chjF+bonA7QRcFxiT6mq1uYYp061QW3Xf147e+r0LmJE+N4DqhqcwXfOl/42+Iv5u5EXo43mJCHzIw4i1nynmE3AVpijJaDOij4IFFTzBgiCqw0vesUJxzCa4WuCMPPDJLNouSYEfa8UCNgKl6TVOMZ3mJsMZu0lfCV8cewzctER8ZXcQryApXugQ3gu86g5OQsB1/twgKni6bIHpc1nIOZTU8zZ6pBdclR04uJpec7T5Wb6UKU5+Un4wjTk+ZYfNFPJAcLFiGs9pT0t+CjiU9GkoS6jAHggV1/2yFxgbTEdCRliqGZoSZiVm0KT0mVdzdpWwqz/CxWO49UI5ijRYqBXn7DJ6SVEWb0EwITVnfgHGf85Trs3GW/yzJvRWyOVpOIfpVd2pIGIAaUlWYs4KVNFPmWRt/4xwd0BnALgTTdv3JSxtR8fSNljZzB8CS93MNGxgTD07b3JtZfi2JXyt0uAGke/8Pnb9g1kTZTRtZXQB37lxbRtNn6DIIoWbS7ZD1epnL7hb+XQ9eF1E9R7J1UnCIoFinGtGqQ2DZYChFhYwDQl6kt/EBA5/xj1G9B0boxaelNNgqphqtQB+U3s/Uudx5Xl8IM9TrVmbZ8uUetk9yeMfgTzMBRtHDkYPYCCHZU2Oyg7bkR2+6sj7sqMOm5SNYwR2CD+2ix0mH9/yxGGZV7HpAm1gvNet99qNu/gzgCevbU14cuHZWywBJpbUwjf58jqQ2KFwk5XALOaXN1EaliXdniUU2kpsovFdVtul48a9N97+h/D2fFPgXXxbsI2wHuj2XYNhVyzdadieATFvIKevWKNqjn3NOpDnqdsjmLUh4RHBHg10MxHuRaiM8IV3AViEUXXTqdsjPkPE/iGiZ0v4eo4h3h8tRnRMMWIX4j5F3PlE/K2IewFQUrx3xtzTMJct+qMm3m+zQzCTo7DgPfNuRw+3ZZs7S0y8qXVMTKa/Q8ylJ+KGpL2Vs+GCLHGCszBtZXKHRmVeYAjLnP3Z+Ihhmafk0K4an/eNy7SJxgzMgpOnogjxB6HbLmIPSkVwTCrW84gUYabUc3pn/kqOUMcTY1BxdipU7F8/Oiw3PZBoxhpTFXOdSCoa2MOkolN1mx2QZqBHhem8aWaqeFRh5NH82UwpZDpKutLbn6kTzUYkmh7lwaxA0XJF1V9+prW9g3UgdljhZUzB+mhJLdC3JUPRooHxjmqgWl7JDpstmk3l24RKGachTBVEZD0XsES/wnmdK+WMltu1eFcT74blXWuCy+ozDruVhqVwQQxJGGVWk0z9yxo3F+4OtzMAdq6SaE0DT8fOMWCnFjUP+xxAP3SWChLWCpZlmECDNZ4HPJ7yRYBnsCzXgI4aBh6Gjn70IFnW2aPjK2HRO8OjV/Muk6SASUhgTOWiYHQvPN0cp6QHMPLu0b3h9K8pmTbCotoIOfh7drvxIFT3LjvQT/eMe9cwEHYV/6Qjls69a6/V/WYnsVMZK+AGGlaBASq1PHMYVHq0+Jow48ygcv1jQnUyNbNXZJA9vp6Sc0t7zPKtOCmSv6U6ao4J7JlyeOb7B2aZwNemspWpBswz3Y9YN+ss6h6Hjr6JjtNj0tENbJVDakjQ+zwBqFN56neCA9LR6/Fx6MnRsSGdLRfYvlh1+x4WiCqChaJ7zsFG46lr4ik4Kk+VCrBteQeyNFAnUo/PhuSontp8JI4Gr+aowc+OxtHpJ0cH4ejbcrczrzuqJ6DvW3f09AzB9Dukz5SuXSnxHQkxxzFUSixHR6wWvgIy2mx+7FdZZ/OLSnD7Pw==</diagram></mxfile>
\ No newline at end of file