METRON-1378: Create a summarizer closes apache/incubator-metron#879
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 82e0a23..6fd2f5a 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -201,6 +201,7 @@
 %{metron_home}/bin/Whois_CSV_to_JSON.py
 %{metron_home}/bin/geo_enrichment_load.sh
 %{metron_home}/bin/flatfile_loader.sh
+%{metron_home}/bin/flatfile_summarizer.sh
 %{metron_home}/bin/prune_elasticsearch_indices.sh
 %{metron_home}/bin/prune_hdfs_files.sh
 %{metron_home}/bin/threatintel_bulk_prune.sh
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/CLIOptions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/CLIOptions.java
new file mode 100644
index 0000000..c43b30b
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/CLIOptions.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+public interface CLIOptions<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> {
+  Option getOption();
+
+  boolean has(CommandLine cli);
+
+  String get(CommandLine cli);
+
+  OptionHandler<OPT_T> getHandler();
+}
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
index 85e7520..6dfebb8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
@@ -18,14 +18,56 @@
 package org.apache.metron.common.utils.cli;
 
 import com.google.common.base.Function;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
 
+import java.util.EnumMap;
 import java.util.Optional;
 
-public abstract class OptionHandler<OPT_T extends Enum<OPT_T>> implements Function<String, Option>
+public abstract class OptionHandler<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> implements Function<String, Option>
 {
   public Optional<Object> getValue(OPT_T option, CommandLine cli) {
     return Optional.empty();
   }
+
+  public abstract String getShortCode();
+
+  public static Options getOptions(CLIOptions[] values) {
+    Options ret = new Options();
+    for(CLIOptions o : values) {
+      ret.addOption(o.getOption());
+    }
+    return ret;
+  }
+
+  public static void printHelp(String name, CLIOptions[] values) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp( name, getOptions(values));
+  }
+
+  public static <OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>>
+  EnumMap<OPT_T, Optional<Object> > createConfig(CommandLine cli, OPT_T[] values, Class<OPT_T> clazz) {
+    EnumMap<OPT_T, Optional<Object> > ret = new EnumMap<>(clazz);
+    for(OPT_T option : values) {
+      ret.put(option, option.getHandler().getValue(option, cli));
+    }
+    return ret;
+  }
+
+  public static CommandLine parse(String name, CommandLineParser parser, String[] args, CLIOptions[] values, CLIOptions helpOption) {
+    try {
+      CommandLine cli = parser.parse(getOptions(values), args);
+      if(helpOption.has(cli)) {
+        printHelp(name, values);
+        System.exit(0);
+      }
+      return cli;
+    } catch (ParseException e) {
+      System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+      e.printStackTrace(System.err);
+      printHelp(name, values);
+      System.exit(-1);
+      return null;
+    }
+  }
 }
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
index f284dab..1f8e52d 100644
--- a/metron-platform/metron-data-management/README.md
+++ b/metron-platform/metron-data-management/README.md
@@ -193,12 +193,15 @@
 
 There are 2 property maps that work with full Stellar expressions, and 2 properties that will work with Stellar predicates.
 
-| Property            | Description
-|---------------------|---
-| value_transform     | Transform fields defined in the "columns" mapping with Stellar transformations. New keys introduced in the transform will be added to the key metadata.
-| value_filter        | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose domain property is empty after removing the TLD will be omitted.
-| indicator_transform | Transform the indicator column independent of the value transformations. You can refer to the original indicator value by using "indicator" as the variable name, as shown in the example above. In addition, if you prefer to piggyback your transformations, you can refer to the variable "domain", which will allow your indicator transforms to inherit transformations done to this value during the value transformations.
-| indicator_filter    | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose indicator value is empty after removing the TLD will be omitted. 
+| Property             | Description
+|----------------------|---
+| `value_transform`    | Transform fields defined in the "columns" mapping with Stellar transformations. New keys introduced in the transform will be added to the key metadata.
+| `value_filter`       | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose domain property is empty after removing the TLD will be omitted.
+| `indicator_transform`| Transform the indicator column independent of the value transformations. You can refer to the original indicator value by using "indicator" as the variable name, as shown in the example above. In addition, if you prefer to piggyback your transformations, you can refer to the variable "domain", which will allow your indicator transforms to inherit transformations done to this value during the value transformations.
+| `indicator_filter`   | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose indicator value is empty after removing the TLD will be omitted.
+| `state_init`         | Allows a state object to be initialized.  This is a string, so a single expression is created.  The output of this expression will be available as the `state` variable.  This is to be used with the `flatfile_summarizer.sh` rather than the loader.
+| `state_update`       | Allows a state object to be updated.  This is a map, so you can have temporary variables here.  Note that you can reference the `state` variable from this.  This is to be used with the `flatfile_summarizer.sh` rather than the loader.
+| `state_merge`        | Allows a list of states to be merged. This is a string, so a single expression.  There is a special field called `states` available, which is a list of the states (one per thread).  This is to be used with the `flatfile_summarizer.sh` rather than the loader.
 
 top-list.csv
 ```
@@ -366,6 +369,94 @@
 | -t         | --tmp_dir           | No           | Directory for landing the temporary GeoIP data - defaults to /tmp                                |
 | -z         | --zk_quorum         | Yes          | Zookeeper Quorum URL (zk1:port,zk2:port,...)                                                     |
 
+### Flatfile Summarizer
+
+The shell script `$METRON_HOME/bin/flatfile_summarizer.sh` will read data from local disk, HDFS or URLs and generate a summary object.
+The object will be serialized and written to disk, either HDFS or local disk depending on the output mode specified.
+
+It should be noted that this utility uses the same extractor config as the `flatfile_loader.sh`,
+but as the output target is not a key value store (but rather a summary object), it is not necessary
+to specify certain configs:
+* `indicator`, `indicator_filter` and `indicator_transform` are not required, but will be executed if present.
+As in the loader, there will be an indicator field available if you so specify it (by using `indicator` in the config).
+* `type` is neither required nor used
+
+Indeed, some new configs are expected:
+* `state_init` : Executed once to initialize the state object (the object written out).
+* `state_update`: Called once per message.  The fields available are the fields for the row as well as
+  * `indicator` - the indicator value if you've specified it in the config
+  * `state` - the current state.  Useful for adding to the state (e.g. `BLOOM_ADD(state, val)` where `val` is the name of a field).
+* `state_merge` : If you are running this multi-threaded and your objects can be merged, this is the statement that will
+merge the state objects created per thread.  There is a special field available to this config:
+  * `states` - a list of the state objects
+
+One special thing to note here is that there is a special configuration
+parameter to the Extractor config that is only considered during this
+loader:
+* inputFormat : This specifies how to consider the data.  The two implementations are `BY_LINE` and `WHOLE_FILE`.
+
+The default is `BY_LINE`, which makes sense for a list of CSVs where
+each line indicates a unit of information which can be imported.
+However, if you are importing a set of STIX documents, then you want
+each document to be considered as input to the Extractor.
+
+#### Example
+
+Consider the possibility that you want to generate a bloom filter with all of the domains in a CSV structured similarly to
+the Alexa top 1M domains, so the columns are:
+* rank
+* domain name
+
+You want to generate a bloom filter with just the domains, not considering the TLD.
+You would execute the following to:
+* read data from `./top-1m.csv`
+* write data to `./filter.ser`
+* use 5 threads
+
+```
+$METRON_HOME/bin/flatfile_summarizer.sh -i ./top-1m.csv -o ./filter.ser -e ./extractor.json -p 5 -b 128
+```
+
+To configure this, `extractor.json` would look like:
+```
+{
+  "config" : {
+    "columns" : {
+      "rank" : 0,
+      "domain" : 1
+    },
+    "value_transform" : {
+      "domain" : "DOMAIN_REMOVE_TLD(domain)"
+    },
+    "value_filter" : "LENGTH(domain) > 0",
+    "state_init" : "BLOOM_INIT()",
+    "state_update" : {
+      "state" : "BLOOM_ADD(state, domain)"
+    },
+    "state_merge" : "BLOOM_MERGE(states)",
+    "separator" : ","
+  },
+  "extractor" : "CSV"
+}
+```
+
+#### Parameters
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code           | Is Required? | Description                                                                                                                                                                         |
+|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h         |                     | No           | Generate the help screen/set of options                                                                                                                                             |
+| -q         | --quiet             | No           | Do not update progress                                                                                                                                                              |
+| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                                                                                   |
+| -m         | --import_mode       | No           | The Import mode to use: LOCAL, MR.  Default: LOCAL                                                                                                                                  |
+| -om        | --output_mode       | No           | The Output mode to use: LOCAL, HDFS.  Default: LOCAL                                                                                                                                  |
+| -i         | --input             | Yes          | The input data location on local disk.  If this is a file, then that file will be loaded.  If this is a directory, then the files will be loaded recursively under that directory.  |
+| -o         | --output            | Yes          | The output data location.    |
+| -l         | --log4j             | No           | The log4j properties file to load                                                                                                                                                   |
+| -p         | --threads           | No           | The number of threads to use when extracting data.  The default is the number of cores.                                                                                             |
+| -b         | --batchSize         | No           | The batch size to use for HBase puts                                                                                                                                                |
+
 ## Pruning Data from Elasticsearch
 
 **Note** - As of the Metron upgrade from Elasticsearch 2.3.3 to 5.6.2, the included Data Pruner is no longer supported. It is replaced in favor of the Curator utility
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
index bd490c8..7fd2741 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -20,9 +20,15 @@
 import org.apache.metron.enrichment.lookup.LookupKV;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Map;
+import java.util.Set;
 
 public interface Extractor {
-    Iterable<LookupKV> extract(String line) throws IOException;
-    void initialize(Map<String, Object> config);
+  Iterable<LookupKV> extract(String line) throws IOException;
+  void initialize(Map<String, Object> config);
+  default Set<ExtractorCapabilities> getCapabilities() {
+    return EnumSet.noneOf(ExtractorCapabilities.class);
+  }
 }
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCapabilities.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCapabilities.java
new file mode 100644
index 0000000..91c6232
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCapabilities.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+public enum ExtractorCapabilities {
+  STATEFUL,
+  MERGEABLE;
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/StatefulExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/StatefulExtractor.java
new file mode 100644
index 0000000..df2334d
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/StatefulExtractor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface StatefulExtractor extends Extractor {
+  Object initializeState(Map<String, Object> config);
+  Object mergeStates(List<? extends Object> states);
+  Iterable<LookupKV> extract(String line, AtomicReference<Object> state) throws IOException;
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
index 790ea9f..c47dfc6 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
@@ -17,22 +17,15 @@
  */
 package org.apache.metron.dataloads.extractor;
 
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR_FILTER;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR_TRANSFORM;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.VALUE_FILTER;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.VALUE_TRANSFORM;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.ZK_QUORUM;
-
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.lang.ref.Reference;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
@@ -47,8 +40,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TransformFilterExtractorDecorator extends ExtractorDecorator {
+import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
+
+public class TransformFilterExtractorDecorator extends ExtractorDecorator implements StatefulExtractor {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final String STATE_KEY = "state";
+  public static final String STATES_KEY = "states";
+
 
   protected enum ExtractorOptions {
     VALUE_TRANSFORM("value_transform"),
@@ -56,7 +54,10 @@
     INDICATOR_TRANSFORM("indicator_transform"),
     INDICATOR_FILTER("indicator_filter"),
     ZK_QUORUM("zk_quorum"),
-    INDICATOR("indicator");
+    INDICATOR("indicator"),
+    STATE_INIT("state_init"),
+    STATE_UPDATE("state_update"),
+    STATE_MERGE("state_merge");
 
     private String key;
 
@@ -83,6 +84,9 @@
   private StellarProcessor transformProcessor;
   private StellarPredicateProcessor filterProcessor;
   private Map<String, Object> globalConfig;
+  private Map<String, String> stateUpdate;
+  private String stateMerge;
+  private Set<ExtractorCapabilities> capabilities;
 
   public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
     super(decoratedExtractor);
@@ -91,8 +95,11 @@
     this.indicatorTransforms = new LinkedHashMap<>();
     this.valueFilter = "";
     this.indicatorFilter = "";
+    this.stateUpdate = new LinkedHashMap<>();
+    this.stateMerge = "";
     this.transformProcessor = new StellarProcessor();
     this.filterProcessor = new StellarPredicateProcessor();
+    this.capabilities = EnumSet.noneOf(ExtractorCapabilities.class);
   }
 
   @Override
@@ -110,6 +117,17 @@
     if (INDICATOR_FILTER.existsIn(config)) {
       this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
     }
+    if (STATE_UPDATE.existsIn(config)) {
+      capabilities.add(ExtractorCapabilities.STATEFUL);
+      this.stateUpdate = getTransforms(config, STATE_UPDATE.toString());
+    }
+    if(STATE_INIT.existsIn(config)) {
+      capabilities.add(ExtractorCapabilities.STATEFUL);
+    }
+    if (STATE_MERGE.existsIn(config)) {
+      capabilities.add(ExtractorCapabilities.MERGEABLE);
+      this.stateMerge = getFilter(config, STATE_MERGE.toString());
+    }
     String zkClientUrl = "";
     if (ZK_QUORUM.existsIn(config)) {
       zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
@@ -120,6 +138,29 @@
     StellarFunctions.initialize(stellarContext);
     this.transformProcessor = new StellarProcessor();
     this.filterProcessor = new StellarPredicateProcessor();
+
+  }
+
+  @Override
+  public Object initializeState(Map<String, Object> config) {
+    if(STATE_INIT.existsIn(config)) {
+      MapVariableResolver resolver = new MapVariableResolver(globalConfig);
+      return transformProcessor.parse( config.get(STATE_INIT.toString()).toString()
+                                      , resolver
+                                      , StellarFunctions.FUNCTION_RESOLVER()
+                                      , stellarContext
+                                      );
+    }
+    return null;
+  }
+
+  @Override
+  public Object mergeStates(List<? extends Object> states) {
+    return transformProcessor.parse( stateMerge
+                            , new MapVariableResolver(new HashMap<String, Object>() {{ put(STATES_KEY, states); }}, globalConfig)
+                            , StellarFunctions.FUNCTION_RESOLVER()
+                            , stellarContext
+                            );
   }
 
   private String getFilter(Map<String, Object> config, String valueFilter) {
@@ -187,10 +228,20 @@
   }
 
   @Override
+  public Set<ExtractorCapabilities> getCapabilities() {
+    return capabilities;
+  }
+
+  @Override
   public Iterable<LookupKV> extract(String line) throws IOException {
+    return extract(line, new AtomicReference<>(null));
+  }
+
+  @Override
+  public Iterable<LookupKV> extract(String line, AtomicReference<Object> state) throws IOException {
     List<LookupKV> lkvs = new ArrayList<>();
     for (LookupKV lkv : super.extract(line)) {
-      if (updateLookupKV(lkv)) {
+      if (updateLookupKV(lkv, state)) {
         lkvs.add(lkv);
       }
     }
@@ -202,23 +253,30 @@
    * @param lkv LookupKV to transform and filter
    * @return true if lkv is not null after transform/filter
    */
-  private boolean updateLookupKV(LookupKV lkv) {
+  private boolean updateLookupKV(LookupKV lkv, AtomicReference<Object> state) {
     Map<String, Object> ret = lkv.getValue().getMetadata();
     Map<String, Object> ind = new LinkedHashMap<>();
     String indicator = lkv.getKey().getIndicator();
     // add indicator as a resolvable variable. Also enable using resolved/transformed variables and values from operating on the value metadata
     ind.put(INDICATOR.toString(), indicator);
-    MapVariableResolver resolver = new MapVariableResolver(ret, ind, globalConfig);
+    Map<String, Object> stateMap = new LinkedHashMap<>();
+    stateMap.put(STATE_KEY, state.get());
+    MapVariableResolver resolver = new MapVariableResolver(ret, ind, globalConfig, stateMap);
     transform(valueTransforms, ret, resolver);
     transform(indicatorTransforms, ind, resolver);
     // update indicator
     Object updatedIndicator = ind.get(INDICATOR.toString());
-    if (updatedIndicator != null) {
+    if (updatedIndicator != null || getCapabilities().contains(ExtractorCapabilities.STATEFUL)) {
       if (!(updatedIndicator instanceof String)) {
         throw new UnsupportedOperationException("Indicator transform must return String type");
       }
       lkv.getKey().setIndicator((String) updatedIndicator);
-      return filter(indicatorFilter, resolver) && filter(valueFilter, resolver);
+      boolean update = filter(indicatorFilter, resolver) && filter(valueFilter, resolver);
+      if(update && !stateUpdate.isEmpty()) {
+        transform(stateUpdate, stateMap, resolver);
+        state.set(stateMap.get(STATE_KEY));
+      }
+      return update;
     } else {
       return false;
     }
@@ -236,6 +294,9 @@
   }
 
   private Boolean filter(String filterPredicate, MapVariableResolver variableResolver) {
+    if(StringUtils.isEmpty(filterPredicate)) {
+      return true;
+    }
     return filterProcessor.parse(filterPredicate, variableResolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
   }
 
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/CommonOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/CommonOptions.java
new file mode 100644
index 0000000..a55d69d
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/CommonOptions.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.common.utils.cli.OptionHandler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+public class CommonOptions {
+  public static class Help<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Override
+    public String getShortCode() {
+      return "h";
+    }
+
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String input) {
+      return new Option(getShortCode(), "help", false, "Generate Help screen");
+    }
+  }
+
+  public static class Quiet<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Override
+    public String getShortCode() {
+      return "q";
+    }
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String input) {
+      return new Option(getShortCode(), "quiet", false, "Do not update progress");
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      return Optional.of(option.has(cli));
+    }
+  }
+
+  public static class ImportMode<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    Object[] importModes;
+    Object defaultMode;
+    Function<String, Optional<Object>> resolver;
+    public ImportMode(Object[] importModes, Object defaultMode, Function<String, Optional<Object>> resolver) {
+      this.importModes = importModes;
+      this.defaultMode = defaultMode;
+      this.resolver = resolver;
+    }
+
+    @Override
+    public String getShortCode() {
+      return "m";
+    }
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String input) {
+      Option o = new Option(getShortCode(), "import_mode", true
+                           , "The Import mode to use: " + Joiner.on(",").join(importModes)
+                           + ".  Default: " +defaultMode
+                           );
+      o.setArgName("MODE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      String mode = option.get(cli);
+      return resolver.apply(mode);
+    }
+  }
+
+  public static class ExtractorConfig<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+      o.setArgName("JSON_FILE");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      try {
+        return Optional.ofNullable(FileUtils.readFileToString(new File(option.get(cli).trim())));
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to retrieve extractor config from " + option.get(cli) + ": " + e.getMessage(), e);
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "e";
+    }
+  }
+
+
+  public static class Log4jProperties<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+      o.setArgName("FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public String getShortCode() {
+      return "l";
+    }
+  }
+
+
+  public static class NumThreads<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
+      o.setArgName("NUM_THREADS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      int numThreads = Runtime.getRuntime().availableProcessors();
+      if(option.has(cli)) {
+        numThreads = ConversionUtils.convert(option.get(cli), Integer.class);
+      }
+      return Optional.of(numThreads);
+    }
+
+    @Override
+    public String getShortCode() {
+      return "p";
+    }
+  }
+
+  public static class BatchSize<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
+      o.setArgName("SIZE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      int batchSize = 128;
+      if(option.has(cli)) {
+        batchSize = ConversionUtils.convert(option.get(cli), Integer.class);
+      }
+      return Optional.of(batchSize);
+    }
+
+    @Override
+    public String getShortCode() {
+      return "b";
+    }
+  }
+
+  public static class Input<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "input", true, "The CSV File to load");
+      o.setArgName("FILE");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      List<String> inputs = new ArrayList<>();
+      for(String input : Splitter.on(",").split(Optional.ofNullable(option.get(cli)).orElse(""))) {
+        inputs.add(input.trim());
+      }
+      return Optional.of(inputs);
+    }
+
+    @Override
+    public String getShortCode() {
+      return "i";
+    }
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java
similarity index 87%
rename from metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
rename to metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java
index 168d251..f0ee3ad 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java
@@ -25,16 +25,18 @@
 
 import java.io.IOException;
 
-public class ExtractorState {
+public class HBaseExtractorState {
   private HTableInterface table;
   private Extractor extractor;
   private HbaseConverter converter;
   private FileSystem fs;
+  private String cf;
 
-  public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter, Configuration config) {
+  public HBaseExtractorState(HTableInterface table, String cf, Extractor extractor, HbaseConverter converter, Configuration config) {
     this.table = table;
     this.extractor = extractor;
     this.converter = converter;
+    this.cf = cf;
     try {
       this.fs = FileSystem.get(config);
     } catch (IOException e) {
@@ -42,6 +44,10 @@
     }
   }
 
+  public String getCf() {
+    return cf;
+  }
+
   public HTableInterface getTable() {
     return table;
   }
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
index 448f406..2967c6d 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
@@ -21,6 +21,7 @@
 import com.google.common.base.Splitter;
 import org.apache.commons.cli.*;
 import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.common.utils.cli.OptionHandler;
 import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
@@ -33,48 +34,14 @@
 import java.util.List;
 import java.util.Optional;
 
-public enum LoadOptions {
-  HELP("h", new OptionHandler<LoadOptions>() {
-
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      return new Option(s, "help", false, "Generate Help screen");
-    }
-  })
-  ,QUIET("q", new OptionHandler<LoadOptions>() {
-
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      return new Option(s, "quiet", false, "Do not update progress");
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      return Optional.of(option.has(cli));
-    }
-  })
-  , IMPORT_MODE("m", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "import_mode", true
-                           , "The Import mode to use: " + Joiner.on(",").join(ImportStrategy.values())
-                           + ".  Default: " + ImportStrategy.LOCAL
-                           );
-      o.setArgName("MODE");
-      o.setRequired(false);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      String mode = option.get(cli);
-      return Optional.of(ImportStrategy.getStrategy(mode).orElse(ImportStrategy.LOCAL));
-    }
-  })
-  ,HBASE_TABLE("t", new OptionHandler<LoadOptions>() {
+public enum LoadOptions implements CLIOptions<LoadOptions> {
+  HELP(new CommonOptions.Help<> ())
+  ,QUIET(new CommonOptions.Quiet<>())
+  , IMPORT_MODE(new CommonOptions.ImportMode<>( ImportStrategy.values()
+                                              , ImportStrategy.LOCAL
+                                              , mode -> Optional.ofNullable(ImportStrategy.getStrategy(mode).orElse(ImportStrategy.LOCAL)))
+               )
+  ,HBASE_TABLE(new OptionHandler<LoadOptions>() {
     @Nullable
     @Override
     public Option apply(@Nullable String s) {
@@ -88,8 +55,13 @@
     public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
       return Optional.ofNullable(option.get(cli).trim());
     }
+
+    @Override
+    public String getShortCode() {
+      return "t";
+    }
   })
-  ,HBASE_CF("c", new OptionHandler<LoadOptions>() {
+  ,HBASE_CF(new OptionHandler<LoadOptions>() {
     @Nullable
     @Override
     public Option apply(@Nullable String s) {
@@ -103,27 +75,19 @@
     public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
       return Optional.ofNullable(option.get(cli).trim());
     }
-  })
-  ,EXTRACTOR_CONFIG("e", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
-      o.setArgName("JSON_FILE");
-      o.setRequired(true);
-      return o;
-    }
 
     @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      try {
-        return Optional.ofNullable(FileUtils.readFileToString(new File(option.get(cli).trim())));
-      } catch (IOException e) {
-        throw new IllegalStateException("Unable to retrieve extractor config from " + option.get(cli) + ": " + e.getMessage(), e);
-      }
+    public String getShortCode() {
+      return "c";
     }
   })
-  ,ENRICHMENT_CONFIG("n", new OptionHandler<LoadOptions>() {
+  ,EXTRACTOR_CONFIG(new CommonOptions.ExtractorConfig<>())
+  ,ENRICHMENT_CONFIG(new OptionHandler<LoadOptions>() {
+    @Override
+    public String getShortCode() {
+      return "n";
+    }
+
     @Nullable
     @Override
     public Option apply(@Nullable String s) {
@@ -136,126 +100,46 @@
       return o;
     }
   })
-  ,LOG4J_PROPERTIES("l", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "log4j", true, "The log4j properties file to load");
-      o.setArgName("FILE");
-      o.setRequired(false);
-      return o;
-    }
-  })
-  ,NUM_THREADS("p", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
-      o.setArgName("NUM_THREADS");
-      o.setRequired(false);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      int numThreads = Runtime.getRuntime().availableProcessors();
-      if(option.has(cli)) {
-        numThreads = ConversionUtils.convert(option.get(cli), Integer.class);
-      }
-      return Optional.of(numThreads);
-    }
-  })
-  ,BATCH_SIZE("b", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
-      o.setArgName("SIZE");
-      o.setRequired(false);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      int batchSize = 128;
-      if(option.has(cli)) {
-        batchSize = ConversionUtils.convert(option.get(cli), Integer.class);
-      }
-      return Optional.of(batchSize);
-    }
-  })
-  ,INPUT("i", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "input", true, "The CSV File to load");
-      o.setArgName("FILE");
-      o.setRequired(true);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      List<String> inputs = new ArrayList<>();
-      for(String input : Splitter.on(",").split(Optional.ofNullable(option.get(cli)).orElse(""))) {
-        inputs.add(input.trim());
-      }
-      return Optional.of(inputs);
-    }
-  })
+  ,LOG4J_PROPERTIES(new CommonOptions.Log4jProperties<>())
+  ,NUM_THREADS(new CommonOptions.NumThreads<>())
+  ,BATCH_SIZE(new CommonOptions.BatchSize<>())
+  ,INPUT(new CommonOptions.Input<>())
   ;
   Option option;
   String shortCode;
   OptionHandler<LoadOptions> handler;
-  LoadOptions(String shortCode, OptionHandler<LoadOptions> optionHandler) {
-    this.shortCode = shortCode;
+  LoadOptions(OptionHandler<LoadOptions> optionHandler) {
+    this.shortCode = optionHandler.getShortCode();
     this.handler = optionHandler;
     this.option = optionHandler.apply(shortCode);
   }
 
+  @Override
+  public OptionHandler<LoadOptions> getHandler() {
+    return handler;
+  }
+
+
+  @Override
+  public Option getOption() {
+    return option;
+  }
+
+  @Override
   public boolean has(CommandLine cli) {
     return cli.hasOption(shortCode);
   }
 
+  @Override
   public String get(CommandLine cli) {
     return cli.getOptionValue(shortCode);
   }
 
   public static CommandLine parse(CommandLineParser parser, String[] args) {
-    try {
-      CommandLine cli = parser.parse(getOptions(), args);
-      if(HELP.has(cli)) {
-        printHelp();
-        System.exit(0);
-      }
-      return cli;
-    } catch (ParseException e) {
-      System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
-      e.printStackTrace(System.err);
-      printHelp();
-      System.exit(-1);
-      return null;
-    }
+    return OptionHandler.parse("SimpleEnrichmentFlatFileLoader", parser, args, values(), HELP);
   }
 
   public static EnumMap<LoadOptions, Optional<Object> > createConfig(CommandLine cli) {
-    EnumMap<LoadOptions, Optional<Object> > ret = new EnumMap<>(LoadOptions.class);
-    for(LoadOptions option : values()) {
-      ret.put(option, option.handler.getValue(option, cli));
-    }
-    return ret;
-  }
-
-  public static void printHelp() {
-    HelpFormatter formatter = new HelpFormatter();
-    formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
-  }
-
-  public static Options getOptions() {
-    Options ret = new Options();
-    for(LoadOptions o : LoadOptions.values()) {
-      ret.addOption(o.option);
-    }
-    return ret;
+    return OptionHandler.createConfig(cli, values(), LoadOptions.class);
   }
 }
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizer.java
new file mode 100644
index 0000000..1cd1abf
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.Summarizers;
+
+import java.io.File;
+import java.util.EnumMap;
+import java.util.Optional;
+
+public class SimpleFlatFileSummarizer {
+    public static void main(String... argv) throws Exception {
+    Configuration hadoopConfig = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(hadoopConfig, argv).getRemainingArgs();
+    main(hadoopConfig, otherArgs);
+  }
+
+  public static void main(Configuration hadoopConfig, String[] argv) throws Exception {
+    CommandLine cli = SummarizeOptions.parse(new PosixParser(), argv);
+    EnumMap<SummarizeOptions, Optional<Object>> config = SummarizeOptions.createConfig(cli);
+    if(SummarizeOptions.LOG4J_PROPERTIES.has(cli)) {
+      PropertyConfigurator.configure(SummarizeOptions.LOG4J_PROPERTIES.get(cli));
+    }
+    ExtractorHandler handler = ExtractorHandler.load(
+            FileUtils.readFileToString(new File(SummarizeOptions.EXTRACTOR_CONFIG.get(cli).trim()))
+    );
+    Summarizers strategy = (Summarizers) config.get(SummarizeOptions.IMPORT_MODE).get();
+    strategy.getSummarizer().importData(config, handler, hadoopConfig);
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SummarizeOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SummarizeOptions.java
new file mode 100644
index 0000000..4e644ba
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SummarizeOptions.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.common.utils.cli.OptionHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.Summarizers;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writers;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum SummarizeOptions implements CLIOptions<SummarizeOptions> {
+  HELP(new CommonOptions.Help<>())
+  ,QUIET(new CommonOptions.Quiet<>())
+  , IMPORT_MODE(new CommonOptions.ImportMode<>(Summarizers.values(), Summarizers.LOCAL, mode -> Optional.of(Summarizers.getStrategy(mode).orElse(Summarizers.LOCAL))))
+  , OUTPUT_MODE(new OptionHandler<SummarizeOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "output_mode", true
+                           , "The output mode to use: " + Joiner.on(",").join(Writers.values())
+                           + ".  Default: " + Writers.LOCAL
+                           );
+      o.setArgName("MODE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(SummarizeOptions option, CommandLine cli) {
+      String mode = option.get(cli);
+      return Optional.of(Writers.getStrategy(mode).orElse(Writers.LOCAL));
+    }
+
+    @Override
+    public String getShortCode() {
+      return "om";
+    }
+  })
+  ,EXTRACTOR_CONFIG(new CommonOptions.ExtractorConfig<>())
+  ,LOG4J_PROPERTIES(new CommonOptions.Log4jProperties<>())
+  ,NUM_THREADS(new CommonOptions.NumThreads<>())
+  ,BATCH_SIZE(new CommonOptions.BatchSize<>())
+  ,INPUT(new CommonOptions.Input<>())
+  ,OUTPUT(new OptionHandler<SummarizeOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "output", true, "The output file to write");
+      o.setArgName("FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(SummarizeOptions option, CommandLine cli) {
+      return Optional.ofNullable(option.get(cli));
+    }
+
+    @Override
+    public String getShortCode() {
+      return "o";
+    }
+  })
+  ;
+  Option option;
+  String shortCode;
+  OptionHandler<SummarizeOptions> handler;
+  SummarizeOptions(OptionHandler<SummarizeOptions> optionHandler) {
+    this.shortCode = optionHandler.getShortCode();
+    this.handler = optionHandler;
+    this.option = optionHandler.apply(shortCode);
+  }
+
+  @Override
+  public OptionHandler<SummarizeOptions> getHandler() {
+    return handler;
+  }
+
+  public Option getOption() {
+    return option;
+  }
+
+  public boolean has(CommandLine cli) {
+    return cli.hasOption(shortCode);
+  }
+
+  public String get(CommandLine cli) {
+    return cli.getOptionValue(shortCode);
+  }
+
+  public static CommandLine parse(CommandLineParser parser, String[] args) {
+    return OptionHandler.parse("SimpleFlatFileSummarizer", parser, args, values(), HELP);
+  }
+
+  public static EnumMap<SummarizeOptions, Optional<Object> > createConfig(CommandLine cli) {
+    return OptionHandler.createConfig(cli, values(), SummarizeOptions.class);
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java
new file mode 100644
index 0000000..1709931
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.metron.common.utils.file.ReaderSpliterator;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class AbstractLocalImporter<OPTIONS_T extends Enum<OPTIONS_T> & CLIOptions, STATE_T>  implements Importer<OPTIONS_T> {
+
+  @Override
+  public void importData( final EnumMap<OPTIONS_T, Optional<Object>> config
+                        , final ExtractorHandler handler
+                        , final Configuration hadoopConfig
+                         ) throws IOException, InvalidWriterOutput {
+    validateState(config, handler);
+    ThreadLocal<STATE_T> state = createState(config, hadoopConfig, handler);
+    boolean quiet = isQuiet(config);
+    boolean lineByLine = !handler.getInputFormat().getClass().equals(WholeFileFormat.class);
+    List<String> inputs = getInputs(config);
+    FileSystem fs = FileSystem.get(hadoopConfig);
+    if(!lineByLine) {
+      extractWholeFiles(inputs, fs, state, quiet);
+    }
+    else {
+      int batchSize = batchSize(config);
+      int numThreads = numThreads(config, handler);
+      extractLineByLine(inputs, fs, state, batchSize, numThreads, quiet);
+    }
+    if(!quiet) {
+      System.out.println();
+    }
+  }
+
+  protected abstract List<String> getInputs(final EnumMap<OPTIONS_T, Optional<Object>> config);
+  protected abstract boolean isQuiet(final EnumMap<OPTIONS_T, Optional<Object>> config);
+  protected abstract int batchSize(final EnumMap<OPTIONS_T, Optional<Object>> config);
+  protected abstract int numThreads(final EnumMap<OPTIONS_T, Optional<Object>> config, ExtractorHandler handler);
+
+  protected abstract void validateState(final EnumMap<OPTIONS_T, Optional<Object>> config
+                                       ,final ExtractorHandler handler
+                                       );
+
+  protected abstract ThreadLocal<STATE_T> createState( final EnumMap<OPTIONS_T, Optional<Object>> config
+                                                     , final Configuration hadoopConfig
+                                                     , final ExtractorHandler handler
+                                                     );
+
+  protected abstract void extract(STATE_T state
+                                 , String line
+                                 ) throws IOException;
+
+  protected Location resolveLocation(String input, FileSystem fs) {
+    return LocationStrategy.getLocation(input, fs);
+  }
+
+  public void extractLineByLine( List<String> inputs
+                               , FileSystem fs
+                               , ThreadLocal<STATE_T> state
+                               , int batchSize
+                               , int numThreads
+                               , boolean quiet
+                               ) throws IOException {
+    inputs.stream().map(input -> resolveLocation(input, fs))
+                   .forEach( loc -> {
+                      final Progress progress = new Progress();
+                      if(!quiet) {
+                        System.out.println("\nProcessing " + loc.toString());
+                      }
+                      try (Stream<String> stream = ReaderSpliterator.lineStream(loc.openReader(), batchSize)) {
+                        ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
+                        forkJoinPool.submit(() ->
+                          stream.parallel().forEach(input ->  {
+                                    try {
+                                      extract(state.get(), input);
+                                      if (!quiet) {
+                                        progress.update();
+                                      }
+                                    }
+                                    catch(IOException e) {
+                                      throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
+                                    }
+                                  }
+                                       )
+                               ).get();
+                             } catch (Exception e) {
+                               throw new IllegalStateException(e.getMessage(), e);
+                             }
+                                  }
+                   );
+  }
+
+  public void extractWholeFiles(List<String> inputs, FileSystem fs, ThreadLocal<STATE_T> state, boolean quiet) throws IOException {
+    final Progress progress = new Progress();
+    final List<Location> locations = getLocationsRecursive(inputs, fs);
+    locations.parallelStream().forEach(loc -> {
+      try(BufferedReader br = loc.openReader()) {
+        String s = br.lines().collect(Collectors.joining());
+        extract(state.get(), s);
+        if(!quiet) {
+          progress.update();
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to read " + loc + ": " + e.getMessage(), e);
+      }
+    });
+  }
+
+  protected List<Location> getLocationsRecursive(List<String> inputs, FileSystem fs) throws IOException {
+    final List<Location> locations = new ArrayList<>();
+    Location.fileVisitor(inputs, loc -> locations.add(loc), fs);
+    return locations;
+  }
+
+  public static class Progress {
+    private int count = 0;
+    private String anim= "|/-\\";
+
+    public synchronized void update() {
+      int currentCount = count++;
+      System.out.print("\rProcessed " + currentCount + " - " + anim.charAt(currentCount % anim.length()));
+    }
+  }
+
+  protected void assertOption(EnumMap<OPTIONS_T, Optional<Object>> config, OPTIONS_T option) {
+    if(!config.containsKey(option)) {
+      throw new IllegalStateException("Expected " + option.getOption().getOpt() + " to be set");
+    }
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
index df88640..7730bb8 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
@@ -20,7 +20,7 @@
 import java.util.Optional;
 
 public enum ImportStrategy {
-  LOCAL(LocalImporter.INSTANCE),
+  LOCAL(new LocalImporter()),
   MR(MapReduceImporter.INSTANCE)
   ;
   private Importer importer;
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
index 81ede08..0c7faf6 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
 import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 
 import java.io.IOException;
@@ -29,6 +30,6 @@
 import java.util.List;
 import java.util.Optional;
 
-public interface Importer {
-  void importData(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException;
+public interface Importer<OPTIONS_T extends Enum<OPTIONS_T>> {
+  void importData(EnumMap<OPTIONS_T, Optional<Object>> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException, InvalidWriterOutput;
 }
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
index 652a4c3..ec37585 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
@@ -18,17 +18,12 @@
 package org.apache.metron.dataloads.nonbulk.flatfile.importer;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.common.utils.file.ReaderSpliterator;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
-import org.apache.metron.dataloads.nonbulk.flatfile.ExtractorState;
+import org.apache.metron.dataloads.nonbulk.flatfile.HBaseExtractorState;
 import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
-import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
-import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.HbaseConverter;
 import org.apache.metron.enrichment.lookup.LookupKV;
@@ -36,119 +31,82 @@
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
-public enum LocalImporter implements Importer {
-  INSTANCE;
+public class LocalImporter extends AbstractLocalImporter<LoadOptions, HBaseExtractorState> {
 
   public interface HTableProviderRetriever {
     HTableProvider retrieve();
   }
 
+  HTableProviderRetriever provider;
+
+  public LocalImporter(HTableProviderRetriever provider) {
+    this.provider = provider;
+  }
+
+  public LocalImporter() {
+    this(() -> new HTableProvider());
+  }
+
 
   @Override
-  public void importData( final EnumMap<LoadOptions, Optional<Object>> config
-                        , final ExtractorHandler handler
-                        , final Configuration hadoopConfig
-                         ) throws IOException {
-    importData(config, handler, hadoopConfig, () -> new HTableProvider());
-
+  protected List<String> getInputs(EnumMap<LoadOptions, Optional<Object>> config) {
+    return (List<String>) config.get(LoadOptions.INPUT).get();
   }
-  public void importData( final EnumMap<LoadOptions, Optional<Object>> config
-                        , final ExtractorHandler handler
-                        , final Configuration hadoopConfig
-                        , final HTableProviderRetriever provider
-                         ) throws IOException {
-    ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+
+  @Override
+  protected boolean isQuiet(EnumMap<LoadOptions, Optional<Object>> config) {
+    return (boolean) config.get(LoadOptions.QUIET).get();
+  }
+
+  @Override
+  protected int batchSize(EnumMap<LoadOptions, Optional<Object>> config) {
+    return (int) config.get(LoadOptions.BATCH_SIZE).get();
+  }
+
+  @Override
+  protected int numThreads(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler) {
+    return (int) config.get(LoadOptions.NUM_THREADS).get();
+  }
+
+  @Override
+  protected void validateState(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler) {
+    assertOption(config, LoadOptions.HBASE_CF);
+    assertOption(config, LoadOptions.HBASE_TABLE);
+  }
+
+
+
+  @Override
+  protected ThreadLocal<HBaseExtractorState> createState(EnumMap<LoadOptions, Optional<Object>> config
+                                                   , Configuration hadoopConfig
+                                                   , final ExtractorHandler handler
+                                                   ) {
+    ThreadLocal<HBaseExtractorState> state = new ThreadLocal<HBaseExtractorState>() {
       @Override
-      protected ExtractorState initialValue() {
+      protected HBaseExtractorState initialValue() {
         try {
+          String cf = (String) config.get(LoadOptions.HBASE_CF).get();
           HTableInterface table = provider.retrieve().getTable(hadoopConfig, (String) config.get(LoadOptions.HBASE_TABLE).get());
-          return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter(), hadoopConfig);
+          return new HBaseExtractorState(table, cf, handler.getExtractor(), new EnrichmentConverter(), hadoopConfig);
         } catch (IOException e1) {
           throw new IllegalStateException("Unable to get table: " + e1);
         }
       }
     };
-    boolean quiet = (boolean) config.get(LoadOptions.QUIET).get();
-    boolean lineByLine = !handler.getInputFormat().getClass().equals(WholeFileFormat.class);
-    List<String> inputs = (List<String>) config.get(LoadOptions.INPUT).get();
-    String cf = (String) config.get(LoadOptions.HBASE_CF).get();
-    if(!lineByLine) {
-      extractWholeFiles(inputs, state, cf, quiet);
-    }
-    else {
-      int batchSize = (int) config.get(LoadOptions.BATCH_SIZE).get();
-      int numThreads = (int) config.get(LoadOptions.NUM_THREADS).get();
-      extractLineByLine(inputs, state, cf, batchSize, numThreads, quiet);
-    }
-
+    return state;
   }
 
-  public void extractLineByLine( List<String> inputs
-                               , ThreadLocal<ExtractorState> state
-                               , String cf
-                               , int batchSize
-                               , int numThreads
-                               , boolean quiet
-                               ) throws IOException {
-    inputs.stream().map(input -> LocationStrategy.getLocation(input, state.get().getFileSystem()))
-                   .forEach( loc -> {
-                      final Progress progress = new Progress();
-                      if(!quiet) {
-                        System.out.println("\nProcessing " + loc.toString());
-                      }
-                      try (Stream<String> stream = ReaderSpliterator.lineStream(loc.openReader(), batchSize)) {
-                        ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
-                        forkJoinPool.submit(() ->
-                          stream.parallel().forEach(input -> {
-                            ExtractorState es = state.get();
-                            try {
-                              es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter(), progress, quiet));
-                            } catch (IOException e) {
-                              throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
-                            }
-                                                             }
-                                       )
-                               ).get();
-                             } catch (Exception e) {
-                               throw new IllegalStateException(e.getMessage(), e);
-                             }
-                                  }
-                   );
+  @Override
+  protected void extract(HBaseExtractorState state, String line) throws IOException {
+    HBaseExtractorState es = state;
+    es.getTable().put(toPut(line, es.getExtractor(), state.getCf(), es.getConverter()));
   }
 
-  public void extractWholeFiles( List<String> inputs, ThreadLocal<ExtractorState> state, String cf, boolean quiet) throws IOException {
-    final Progress progress = new Progress();
-    final List<Location> locations = new ArrayList<>();
-      Location.fileVisitor(inputs, loc -> locations.add(loc), state.get().getFileSystem());
-      locations.parallelStream().forEach(loc -> {
-        try(BufferedReader br = loc.openReader()) {
-          String s = br.lines().collect(Collectors.joining());
-          state.get().getTable().put(extract( s
-                                            , state.get().getExtractor()
-                                            , cf, state.get().getConverter()
-                                            , progress
-                                            , quiet
-                                            )
-                                    );
-        } catch (IOException e) {
-          throw new IllegalStateException("Unable to read " + loc + ": " + e.getMessage(), e);
-        }
-      });
-  }
-
-
-  public List<Put> extract(String line
+  public List<Put> toPut(String line
                      , Extractor extractor
                      , String cf
                      , HbaseConverter converter
-                     , final Progress progress
-                     , final boolean quiet
                      ) throws IOException
   {
     List<Put> ret = new ArrayList<>();
@@ -157,21 +115,8 @@
       Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
       ret.add(put);
     }
-    if(!quiet) {
-      progress.update();
-    }
+
     return ret;
   }
 
-
-  public static class Progress {
-    private int count = 0;
-    private String anim= "|/-\\";
-
-    public synchronized void update() {
-      int currentCount = count++;
-      System.out.print("\rProcessed " + currentCount + " - " + anim.charAt(currentCount % anim.length()));
-    }
-  }
-
 }
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java
new file mode 100644
index 0000000..7042e86
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalSummarizer.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.dataloads.extractor.ExtractorCapabilities;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.StatefulExtractor;
+import org.apache.metron.dataloads.nonbulk.flatfile.SummarizeOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writer;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writers;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LocalSummarizer extends AbstractLocalImporter<SummarizeOptions, LocalSummarizer.SummarizationState> {
+  List<SummarizationState> stateList;
+
+  public LocalSummarizer() {
+    stateList = Collections.synchronizedList(new ArrayList<>());
+  }
+
+  public static class SummarizationState {
+    AtomicReference<Object> state;
+    StatefulExtractor extractor;
+    public SummarizationState(StatefulExtractor extractor, Object initState) {
+      this.state = new AtomicReference<>(initState);
+      this.extractor = extractor;
+    }
+
+    public AtomicReference<Object> getState() {
+      return state;
+    }
+
+    public StatefulExtractor getExtractor() {
+      return extractor;
+    }
+
+  }
+
+  @Override
+  protected boolean isQuiet(EnumMap<SummarizeOptions, Optional<Object>> config) {
+    return (boolean) config.getOrDefault(SummarizeOptions.QUIET, Optional.of(false)).get();
+  }
+
+  @Override
+  protected int batchSize(EnumMap<SummarizeOptions, Optional<Object>> config) {
+    return (int) config.getOrDefault(SummarizeOptions.BATCH_SIZE, Optional.of(1)).get();
+  }
+
+  @Override
+  protected int numThreads(EnumMap<SummarizeOptions, Optional<Object>> config, ExtractorHandler handler) {
+    if(handler.getExtractor().getCapabilities().contains(ExtractorCapabilities.MERGEABLE)) {
+      return (int) config.get(SummarizeOptions.NUM_THREADS).get();
+    }
+    else {
+      //force one thread in the case it's not mergeable.
+      return 1;
+    }
+  }
+
+  @Override
+  protected void validateState(EnumMap<SummarizeOptions, Optional<Object>> config, ExtractorHandler handler) {
+    if(!(handler.getExtractor() instanceof StatefulExtractor)){
+      throw new IllegalStateException("Extractor must be a stateful extractor and " + handler.getExtractor().getClass().getName() + " is not.");
+    }
+    assertOption(config, SummarizeOptions.OUTPUT);
+    if(!handler.getExtractor().getCapabilities().contains(ExtractorCapabilities.STATEFUL)) {
+      throw new IllegalStateException("Unable to operate on a non-stateful extractor.  " +
+              "If you have not specified \"stateUpdate\" in your Extractor config, there is nothing to do here and nothing will be written.");
+    }
+
+  }
+
+  @Override
+  protected ThreadLocal<SummarizationState> createState(EnumMap<SummarizeOptions, Optional<Object>> config, Configuration hadoopConfig, ExtractorHandler handler) {
+    final StatefulExtractor extractor = (StatefulExtractor)handler.getExtractor();
+    return ThreadLocal.withInitial(() -> {
+      Object initState = extractor.initializeState(handler.getConfig());
+      SummarizationState ret = new SummarizationState(extractor, initState);
+      stateList.add(ret);
+      return ret;
+    });
+  }
+
+
+  @Override
+  protected void extract(SummarizationState state, String line) throws IOException {
+    state.getExtractor().extract(line, state.getState());
+  }
+
+  @Override
+  public void importData(EnumMap<SummarizeOptions, Optional<Object>> config, ExtractorHandler handler, Configuration hadoopConfig) throws IOException, InvalidWriterOutput {
+    Writer writer = (Writer) config.get(SummarizeOptions.OUTPUT_MODE).get();
+    Optional<String> fileName = Optional.ofNullable((String)config.get(SummarizeOptions.OUTPUT).orElse(null));
+    writer.validate(fileName, hadoopConfig);
+    super.importData(config, handler, hadoopConfig);
+    StatefulExtractor extractor = (StatefulExtractor) handler.getExtractor();
+    Object finalState = null;
+    if(stateList.size() == 1) {
+      finalState = stateList.get(0).getState().get();
+    }
+    else if(stateList.size() > 1) {
+      List<Object> states = new ArrayList<>();
+      for(SummarizationState s : stateList) {
+        states.add(s.getState().get());
+      }
+      finalState = extractor.mergeStates(states);
+    }
+    writer.write(finalState, fileName, hadoopConfig);
+  }
+
+  @Override
+  protected List<String> getInputs(EnumMap<SummarizeOptions, Optional<Object>> config) {
+    Object o = config.get(SummarizeOptions.INPUT).get();
+    if(o == null) {
+      return new ArrayList<>();
+    }
+    if(o instanceof String) {
+      return ImmutableList.of((String)o);
+    }
+    return (List<String>) config.get(SummarizeOptions.INPUT).get();
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
index 401ace2..1b34ed4 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
@@ -38,7 +38,7 @@
 import org.slf4j.LoggerFactory;
 
 
-public enum MapReduceImporter implements Importer{
+public enum MapReduceImporter implements Importer<LoadOptions> {
   INSTANCE
   ;
 
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java
new file mode 100644
index 0000000..180aa23
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Summarizers.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import java.util.Optional;
+
+public enum Summarizers {
+  LOCAL(new LocalSummarizer());
+
+  private Importer importer;
+
+  Summarizers(Importer importer) {
+    this.importer = importer;
+  }
+
+  public Importer getSummarizer() {
+    return importer;
+  }
+
+  public static Optional<Summarizers> getStrategy(String strategyName) {
+    if(strategyName == null) {
+      return Optional.empty();
+    }
+    for(Summarizers strategy : values()) {
+      if(strategy.name().equalsIgnoreCase(strategyName.trim())) {
+        return Optional.of(strategy);
+      }
+    }
+    return Optional.empty();
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java
new file mode 100644
index 0000000..22f4aa1
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/ConsoleWriter.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.SerDeUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class ConsoleWriter implements Writer{
+  @Override
+  public void validate(Optional<String> output, Configuration hadoopConfig) {
+
+  }
+
+  @Override
+  public void write(Object obj, Optional<String> output, Configuration hadoopConfig) throws IOException {
+    System.out.println(obj);
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException {
+    System.out.println(SerDeUtils.fromBytes(obj, Object.class));
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java
new file mode 100644
index 0000000..1c0c726
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/HDFSWriter.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public class HDFSWriter implements Writer {
+  @Override
+  public void validate(Optional<String> fileNameOptional, Configuration hadoopConfig) throws InvalidWriterOutput {
+    if(!fileNameOptional.isPresent()) {
+      throw new InvalidWriterOutput("Filename is not present.");
+    }
+    String fileName = fileNameOptional.get();
+    if(StringUtils.isEmpty(fileName) || fileName.trim().equals(".") || fileName.trim().equals("..") || fileName.trim().endsWith("/")) {
+      throw new InvalidWriterOutput("Filename is empty or otherwise invalid.");
+    }
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException {
+    FileSystem fs = FileSystem.get(hadoopConfig);
+    try(FSDataOutputStream stream = fs.create(new Path(output.get()))) {
+      IOUtils.write(obj, stream);
+      stream.flush();
+    }
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java
new file mode 100644
index 0000000..7c237c8
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/InvalidWriterOutput.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+public class InvalidWriterOutput extends Exception {
+  public InvalidWriterOutput(String message) {
+    super(message);
+  }
+
+  public InvalidWriterOutput(String message, Throwable t) {
+    super(message, t);
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java
new file mode 100644
index 0000000..d8bda81
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/LocalWriter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+
+public class LocalWriter implements Writer {
+
+  @Override
+  public void validate(Optional<String> fileNameOptional, Configuration hadoopConfig) throws InvalidWriterOutput {
+    if(!fileNameOptional.isPresent()) {
+      throw new InvalidWriterOutput("Filename is not present.");
+    }
+    String fileName = fileNameOptional.get();
+    if(StringUtils.isEmpty(fileName) || fileName.trim().equals(".") || fileName.trim().equals("..") || fileName.trim().endsWith("/")) {
+      throw new InvalidWriterOutput("Filename is empty or otherwise invalid.");
+    }
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException {
+    File outFile = new File(output.get());
+    if(!outFile.getParentFile().exists()) {
+      outFile.getParentFile().mkdirs();
+    }
+    try(FileOutputStream fs = new FileOutputStream(outFile)) {
+      IOUtils.write(obj, fs);
+      fs.flush();
+    }
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java
new file mode 100644
index 0000000..ba13ba1
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.utils.SerDeUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public interface Writer {
+  void validate(Optional<String> output, Configuration hadoopConfig) throws InvalidWriterOutput;
+  default void write(Object obj, Optional<String> output, Configuration hadoopConfig) throws IOException {
+    if(obj != null) {
+      write(SerDeUtils.toBytes(obj), output, hadoopConfig);
+    }
+  }
+  void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException;
+}
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java
new file mode 100644
index 0000000..785ad21
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/writer/Writers.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.writer;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.Optional;
+
+public enum Writers implements Writer {
+  LOCAL(new LocalWriter()),
+  HDFS(new HDFSWriter()),
+  CONSOLE(new ConsoleWriter())
+  ;
+  private Writer writer;
+
+  Writers(Writer writer) {
+    this.writer = writer;
+  }
+  public static Optional<Writers> getStrategy(String strategyName) {
+    if(strategyName == null) {
+      return Optional.empty();
+    }
+    for(Writers strategy : values()) {
+      if(strategy.name().equalsIgnoreCase(strategyName.trim())) {
+        return Optional.of(strategy);
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public void validate(Optional<String> output, Configuration hadoopConf) throws InvalidWriterOutput {
+    writer.validate(output, hadoopConf);
+  }
+
+  @Override
+  public void write(byte[] obj, Optional<String> output, Configuration hadoopConf) throws IOException {
+    writer.write(obj, output, hadoopConf);
+  }
+}
diff --git a/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh b/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh
new file mode 100755
index 0000000..018d61a
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/scripts/flatfile_summarizer.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export CLASSNAME="org.apache.metron.dataloads.nonbulk.flatfile.SimpleFlatFileSummarizer"
+export DM_JAR=${project.artifactId}-$METRON_VERSION.jar
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+export HADOOP_OPTS="$HADOOP_OPTS $METRON_JVMFLAGS"
+if [ $(which hadoop) ]
+then
+  HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+  for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+    if [ -f $jar ];then
+      LIBJARS="$jar,$LIBJARS"
+    fi
+  done
+  export HADOOP_CLASSPATH
+  hadoop jar $METRON_HOME/lib/$DM_JAR $CLASSNAME -libjars ${LIBJARS} "$@"
+else
+  echo "Warning: Metron cannot find the hadoop client on this node.  This means that loading via Map Reduce will NOT function."
+  CP=$METRON_HOME/lib/$DM_JAR:/usr/metron/${METRON_VERSION}/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+  java $METRON_JVMFLAGS -cp $CP $CLASSNAME "$@"
+fi
+
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java
new file mode 100644
index 0000000..17e3206
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizerTest.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.LocalSummarizer;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.RawLocation;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writer;
+import org.apache.metron.stellar.common.utils.StellarProcessorUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class SimpleFlatFileSummarizerTest {
+  /**
+   {
+   "config" : {
+     "columns" : {
+       "rank" : 0,
+       "domain" : 1
+     },
+     "value_transform" : {
+       "domain" : "DOMAIN_REMOVE_TLD(domain)"
+     },
+     "value_filter" : "LENGTH(domain) > 0",
+     "state_init" : "MULTISET_INIT()",
+     "state_update" : {
+       "state" : "MULTISET_ADD(state, domain)"
+     },
+     "state_merge" : "MULTISET_MERGE(states)",
+     "separator" : ","
+     },
+     "extractor" : "CSV"
+   }
+   */
+  @Multiline
+  public static String stellarExtractorConfigLineByLine;
+
+  /**
+   {
+   "config" : {
+     "columns" : {
+       "rank" : 0,
+       "domain" : 1
+     },
+     "value_transform" : {
+       "domain" : "DOMAIN_REMOVE_TLD(domain)"
+     },
+     "value_filter" : "LENGTH(domain) > 0",
+     "state_init" : "MULTISET_INIT()",
+     "state_update" : {
+       "state" : "MULTISET_ADD(state, domain)"
+     },
+     "state_merge" : "MULTISET_MERGE(states)",
+     "separator" : ","
+     },
+     "extractor" : "CSV",
+     "inputFormat" : "WHOLE_FILE"
+   }
+   */
+  @Multiline
+  public static String stellarExtractorConfigWholeFile;
+
+
+  public static List<String> domains = ImmutableList.of(
+          "google.com",
+          "youtube.com",
+          "facebook.com",
+          "baidu.com",
+          "wikipedia.org",
+          "yahoo.com",
+          "google.co.in",
+          "reddit.com",
+          "qq.com",
+          "amazon.com",
+          "taobao.com",
+          "tmall.com",
+          "twitter.com",
+          "live.com",
+          "vk.com",
+          "google.co.jp",
+          "instagram.com",
+          "sohu.com",
+          "sina.com.cn",
+          "jd.com"
+  );
+
+  public static String generateData() {
+    List<String> tmp = new ArrayList<>();
+    int i = 1;
+    for(String d : domains) {
+      tmp.add(i + "," + d);
+    }
+    return Joiner.on("\n").join(tmp);
+  }
+
+  @Test
+  public void testArgs() throws Exception {
+    String[] argv = { "-e extractor.json"
+            , "-o out.ser"
+            , "-l log4j", "-i input.csv"
+            , "-p 2", "-b 128", "-q"
+    };
+
+    Configuration config = new Configuration();
+    String[] otherArgs = new GenericOptionsParser(config, argv).getRemainingArgs();
+
+    CommandLine cli = SummarizeOptions.parse(new PosixParser(), otherArgs);
+    Assert.assertEquals("extractor.json", SummarizeOptions.EXTRACTOR_CONFIG.get(cli).trim());
+    Assert.assertEquals("input.csv", SummarizeOptions.INPUT.get(cli).trim());
+    Assert.assertEquals("log4j", SummarizeOptions.LOG4J_PROPERTIES.get(cli).trim());
+    Assert.assertEquals("2", SummarizeOptions.NUM_THREADS.get(cli).trim());
+    Assert.assertEquals("128", SummarizeOptions.BATCH_SIZE.get(cli).trim());
+  }
+
+  public static class InMemoryLocation implements RawLocation {
+    Map<String, String> inMemoryData;
+    public InMemoryLocation(Map<String, String> inMemoryData)
+    {
+      this.inMemoryData = inMemoryData;
+    }
+
+    @Override
+    public Optional<List<String>> list(String loc) throws IOException {
+      if(loc.equals(".")) {
+        ArrayList<String> ret = new ArrayList<>(inMemoryData.keySet());
+        return Optional.of(ret);
+      }
+      return Optional.empty();
+    }
+
+    @Override
+    public boolean exists(String loc) {
+      return loc.equals(".") ? true:inMemoryData.containsKey(loc);
+    }
+
+    @Override
+    public boolean isDirectory(String loc) throws IOException {
+      return loc.equals(".")?true:false;
+    }
+
+    @Override
+    public InputStream openInputStream(String loc) throws IOException {
+      return new ByteArrayInputStream(inMemoryData.get(loc).getBytes());
+    }
+
+    @Override
+    public boolean match(String loc) {
+      return exists(loc);
+    }
+  }
+
+  public class MockSummarizer extends LocalSummarizer {
+    Map<String, String> mockData;
+    public MockSummarizer(Map<String, String> mockData) {
+      this.mockData = mockData;
+    }
+
+    @Override
+    protected List<Location> getLocationsRecursive(List<String> inputs, FileSystem fs) throws IOException {
+      Set<Location> ret = new HashSet<>();
+      for(String input : inputs) {
+        if(input.equals(".")) {
+          for(String s : mockData.keySet()) {
+            ret.add(resolveLocation(s, fs));
+          }
+        }
+        else {
+          ret.add(resolveLocation(input, fs));
+        }
+      }
+      return new ArrayList<>(ret);
+    }
+
+    @Override
+    protected Location resolveLocation(String input, FileSystem fs) {
+      return new Location(input, new InMemoryLocation(mockData));
+    }
+  }
+
+  public static class PeekingWriter implements Writer {
+    AtomicReference<Object> ref;
+    public PeekingWriter(AtomicReference<Object> ref) {
+      this.ref = ref;
+    }
+
+    @Override
+    public void validate(Optional<String> output, Configuration hadoopConfig) {
+
+    }
+    @Override
+    public void write(Object obj, Optional<String> output, Configuration hadoopConfig) throws IOException {
+      ref.set(obj);
+    }
+
+    @Override
+    public void write(byte[] obj, Optional<String> output, Configuration hadoopConfig) throws IOException {
+
+    }
+  }
+
+  @Test
+  public void testLineByLine() throws IOException, InvalidWriterOutput {
+    testLineByLine(5);
+    testLineByLine(1);
+  }
+
+  public void testLineByLine(final int numThreads) throws IOException, InvalidWriterOutput {
+    ExtractorHandler handler = ExtractorHandler.load(stellarExtractorConfigLineByLine);
+    LocalSummarizer summarizer = new MockSummarizer(
+            ImmutableMap.of("input.csv", generateData())
+    );
+    final AtomicReference<Object> finalObj = new AtomicReference<>(null);
+    EnumMap<SummarizeOptions, Optional<Object>> options = new EnumMap<SummarizeOptions, Optional<Object>>(SummarizeOptions.class) {{
+      put(SummarizeOptions.INPUT, Optional.of("input.csv"));
+      put(SummarizeOptions.BATCH_SIZE, Optional.of(5));
+      put(SummarizeOptions.QUIET, Optional.of(true));
+      put(SummarizeOptions.OUTPUT_MODE, Optional.of(new PeekingWriter(finalObj)));
+      put(SummarizeOptions.OUTPUT, Optional.of("out"));
+      put(SummarizeOptions.NUM_THREADS, Optional.of(numThreads));
+    }};
+    summarizer.importData(options, handler, new Configuration());
+    String expr = "MAP_GET(DOMAIN_REMOVE_TLD(domain), s) > 0";
+    for(String domain : domains) {
+      Boolean b = (Boolean)StellarProcessorUtils.run(expr, ImmutableMap.of("s", finalObj.get(), "domain", domain));
+      Assert.assertTrue("Can't find " + domain, b);
+    }
+  }
+
+  @Test
+  public void testWholeFile() throws Exception {
+    testWholeFile(5);
+    testWholeFile(1);
+  }
+
+  public void testWholeFile(final int numThreads) throws IOException, InvalidWriterOutput {
+    ExtractorHandler handler = ExtractorHandler.load(stellarExtractorConfigWholeFile);
+    LocalSummarizer summarizer = new MockSummarizer(
+            new HashMap<String, String>() {{
+              for(String domain : domains) {
+                put(domain, "1," + domain);
+              }
+            }}
+    );
+    final AtomicReference<Object> finalObj = new AtomicReference<>(null);
+    EnumMap<SummarizeOptions, Optional<Object>> options = new EnumMap<SummarizeOptions, Optional<Object>>(SummarizeOptions.class) {{
+      put(SummarizeOptions.INPUT, Optional.of("."));
+      put(SummarizeOptions.BATCH_SIZE, Optional.of(5));
+      put(SummarizeOptions.QUIET, Optional.of(true));
+      put(SummarizeOptions.OUTPUT_MODE, Optional.of(new PeekingWriter(finalObj)));
+      put(SummarizeOptions.OUTPUT, Optional.of("out"));
+      put(SummarizeOptions.NUM_THREADS, Optional.of(numThreads));
+    }};
+    summarizer.importData(options, handler, new Configuration());
+    String expr = "MAP_GET(DOMAIN_REMOVE_TLD(domain), s) > 0";
+    for(String domain : domains) {
+      Boolean b = (Boolean)StellarProcessorUtils.run(expr, ImmutableMap.of("s", finalObj.get(), "domain", domain));
+      Assert.assertTrue("Can't find " + domain, b);
+    }
+  }
+
+}
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
index ab8ced1..1d2655d 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/NetworkFunctions.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.stellar.dsl.functions;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import com.google.common.net.InternetDomainName;
@@ -229,7 +230,13 @@
   private static String extractTld(InternetDomainName idn, String dn) {
 
     if(idn != null && idn.hasPublicSuffix()) {
-      return idn.publicSuffix().toString();
+      String ret = idn.publicSuffix().toString();
+      if(ret.startsWith("InternetDomainName")) {
+        return Joiner.on(".").join(idn.publicSuffix().parts());
+      }
+      else {
+        return ret;
+      }
     }
     else if(dn != null) {
       StringBuffer tld = new StringBuffer("");