METRON-1569: Allow user to change field name conversion when indexing to Elasticsearch (nickwallen via mmiklavc) closes apache/metron#1022
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index 7ef9f00..5b67aa5 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -31,6 +31,7 @@
   public static final String ENABLED_CONF = "enabled";
   public static final String INDEX_CONF = "index";
   public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
+  public static final String FIELD_NAME_CONVERTER_CONF = "fieldNameConverter";
 
   public Map<String, Object> getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) {
     Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
@@ -61,7 +62,8 @@
   }
 
   public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) {
-    Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
+    String key = getKey(sensorType);
+    Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(key);
     if(ret == null) {
       return new HashMap();
     }
@@ -147,6 +149,10 @@
     return getOutputPathFunction(getSensorIndexingConfig(sensorName, writerName), sensorName);
   }
 
+  public String getFieldNameConverter(String sensorName, String writerName) {
+    return getFieldNameConverter(getSensorIndexingConfig(sensorName, writerName), sensorName);
+  }
+
   public static boolean isEnabled(Map<String, Object> conf) {
     return getAs( ENABLED_CONF
                  ,conf
@@ -187,6 +193,10 @@
     );
   }
 
+  public static String getFieldNameConverter(Map<String, Object> conf, String sensorName) {
+    return getAs(FIELD_NAME_CONVERTER_CONF, conf, "", String.class);
+  }
+
   public static Map<String, Object> setEnabled(Map<String, Object> conf, boolean enabled) {
     Map<String, Object> ret = conf == null?new HashMap<>():conf;
     ret.put(ENABLED_CONF, enabled);
@@ -210,6 +220,12 @@
     return ret;
   }
 
+  public static Map<String, Object> setFieldNameConverter(Map<String, Object> conf, String index) {
+    Map<String, Object> ret = conf == null ? new HashMap<>(): conf;
+    ret.put(FIELD_NAME_CONVERTER_CONF, index);
+    return ret;
+  }
+
   public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) {
     return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
   }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
index beb9373..ab25a80 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
@@ -72,4 +72,9 @@
   public boolean isDefault(String sensorName) {
     return config.orElse(new IndexingConfigurations()).isDefault(sensorName, writerName);
   }
+
+  @Override
+  public String getFieldNameConverter(String sensorName) {
+    return config.orElse(new IndexingConfigurations()).getFieldNameConverter(sensorName, writerName);
+  }
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
index ae74c65..4603b32 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
@@ -101,4 +101,10 @@
   public boolean isDefault(String sensorName) {
     return false;
   }
+
+  @Override
+  public String getFieldNameConverter(String sensorName) {
+    // not applicable
+    return null;
+  }
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
index e50bd2b..2d5a62c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
@@ -68,4 +68,10 @@
   public boolean isDefault(String sensorName) {
     return false;
   }
+
+  @Override
+  public String getFieldNameConverter(String sensorName) {
+    // not applicable
+    return null;
+  }
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
index 2354f95..4abb582 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
@@ -18,17 +18,86 @@
 
 package org.apache.metron.common.configuration.writer;
 
+import org.apache.metron.common.field.FieldNameConverter;
+
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Configures a writer to write messages to an endpoint.
+ *
+ * <p>Each destination will have its own {@link WriterConfiguration}; for example HDFS, Elasticsearch, and Solr.
+ * <p>A writer can be configured independently for each source type.
+ */
 public interface WriterConfiguration extends Serializable {
+
+  /**
+   * Defines the maximum batch size for a given sensor.
+   *
+   * @param sensorName The name of the sensor.
+   * @return The batch size for the sensor.
+   */
   int getBatchSize(String sensorName);
+
+  /**
+   * Defines the batch timeout for a given sensor.  Even if the maximum
+   * batch size has not been reached, the messages will be written when
+   * the timeout is reached.
+   *
+   * @param sensorName The name of the sensor.
+   * @return The batch timeout for the sensor.
+   */
   int getBatchTimeout(String sensorName);
+
+  /**
+   * Returns the batch timeouts for all of the currently configured sensors.
+   * @return All of the batch timeouts.
+   */
   List<Integer> getAllConfiguredTimeouts();
+
+  /**
+   * The name of the index to write to for a given sensor.
+   *
+   * @param sensorName The name of the sensor.
+   * @return The name of the index to write to
+   */
   String getIndex(String sensorName);
+
+  /**
+   * Returns true, if this writer is enabled for the given sensor.
+   *
+   * @param sensorName The name of the sensor.
+   * @return True, if this writer is enabled.  Otherwise, false.
+   */
   boolean isEnabled(String sensorName);
+
+  /**
+   * @param sensorName The name of a sensor.
+   * @return
+   */
   Map<String, Object> getSensorConfig(String sensorName);
+
+  /**
+   * Returns the global configuration.
+   * @return The global configuration.
+   */
   Map<String, Object> getGlobalConfig();
+
+  /**
+   * Returns true, if the current writer configuration is set to all default values.
+   *
+   * @param sensorName The name of the sensor.
+   * @return True, if the writer is using all default values. Otherwise, false.
+   */
   boolean isDefault(String sensorName);
+
+  /**
+   * Return the {@link FieldNameConverter} to use
+   * when writing messages.
+   *
+   * @param sensorName The name of the sensor;
+   * @return
+   */
+  String getFieldNameConverter(String sensorName);
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java
new file mode 100644
index 0000000..a571ce5
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.field;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+
+/**
+ * A {@link FieldNameConverter} that replaces all field names containing dots
+ * with colons.
+ */
+public class DeDotFieldNameConverter implements FieldNameConverter, Serializable {
+
+  private static final long serialVersionUID = -3126840090749760299L;
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Override
+  public String convert(String originalField) {
+
+    String newField = originalField.replace(".",":");
+
+    if(LOG.isDebugEnabled() && originalField.contains(".")) {
+      LOG.debug("Renamed dotted field; original={}, new={}", originalField, newField);
+    }
+
+    return newField;
+  }
+}
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java
similarity index 73%
rename from metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java
rename to metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java
index 92e7ec6..0c15ec4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java
@@ -15,10 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.interfaces;
+package org.apache.metron.common.field;
 
+/**
+ * Allows field names to be transformed before a message is written to an endpoint.
+ */
 public interface FieldNameConverter {
 
-    String convert(String originalField);
-
+  /**
+   * Convert the field name.
+   *
+   * @param originalField The original field name.
+   * @return The new field name.
+   */
+  String convert(String originalField);
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java
new file mode 100644
index 0000000..d5858ed
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Enumerates a set of {@link FieldNameConverter} implementations.
+ *
+ * <p>Provides shared instances of each {@link FieldNameConverter}.
+ *
+ * <p>Allows the field name converter to be specified using a short-hand
+ * name, rather than the entire fully-qualified class name.
+ */
+public enum FieldNameConverters implements FieldNameConverter {
+
+  /**
+   * A {@link FieldNameConverter} that does not rename any fields.  All field
+   * names remain unchanged.
+   */
+  NOOP(new NoopFieldNameConverter()),
+
+  /**
+   * A {@link FieldNameConverter} that replaces all field names containing dots
+   * with colons.
+   */
+  DEDOT(new DeDotFieldNameConverter());
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private FieldNameConverter converter;
+
+  FieldNameConverters(FieldNameConverter converter) {
+    this.converter = converter;
+  }
+
+  /**
+   * Returns a shared instance of the {@link FieldNameConverter}/
+   *
+   * @return A shared {@link FieldNameConverter} instance.
+   */
+  public FieldNameConverter get() {
+    return converter;
+  }
+
+  /**
+   * Allows the {@link FieldNameConverters} enums to be used directly as a {@link FieldNameConverter}.
+   *
+   * {@code
+   * FieldNameConverter converter = FieldNameConverters.DEDOT;
+   * }
+   *
+   * @param originalField The original field name.
+   * @return
+   */
+  @Override
+  public String convert(String originalField) {
+    return converter.convert(originalField);
+  }
+
+  /**
+   * Create a new {@link FieldNameConverter}.
+   *
+   * @param sensorType The type of sensor.
+   * @param config The writer configuration.
+   * @return
+   */
+  public static FieldNameConverter create(String sensorType, WriterConfiguration config) {
+    FieldNameConverter result = null;
+
+    // which field name converter has been configured?
+    String converterName = config.getFieldNameConverter(sensorType);
+    if(StringUtils.isNotBlank(converterName)) {
+      try {
+        result = FieldNameConverters.valueOf(converterName);
+
+      } catch (IllegalArgumentException e) {
+        LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}",
+                converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e));
+      }
+    }
+
+    if(result == null) {
+      // if no converter defined or an invalid converter is defined, default to 'DEDOT'
+      result = FieldNameConverters.DEDOT;
+    }
+
+    LOG.debug("Created field name converter; sensorType={}, configured={}, class={}",
+            sensorType, converterName, ClassUtils.getShortClassName(result, "null"));
+
+    return result;
+  }
+}
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java
similarity index 70%
copy from metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java
copy to metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java
index 92e7ec6..a7f3f7c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java
@@ -15,10 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.common.interfaces;
+package org.apache.metron.common.field;
 
-public interface FieldNameConverter {
+/**
+ * A {@link FieldNameConverter} that does not rename any fields.  All field
+ * names remain unchanged.
+ */
+public class NoopFieldNameConverter implements FieldNameConverter {
 
-    String convert(String originalField);
+  @Override
+  public String convert(String originalField) {
 
+    // no change to the field name
+    return originalField;
+  }
 }
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java
similarity index 61%
rename from metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java
rename to metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java
index 3e52581..cc1102f 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java
@@ -16,17 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.metron.elasticsearch.writer;
+package org.apache.metron.common.field;
 
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
-public class ElasticsearchFieldNameConverterTest {
+public class DeDotFieldNameConverterTest {
 
-    @Test
-    public void convert() throws Exception {
-        assertEquals("testfield:with:colons",new ElasticsearchFieldNameConverter().convert("testfield.with.colons"));
-    }
+  @Test
+  public void testWithColons() throws Exception {
+    String actual = new DeDotFieldNameConverter().convert("testfield.with.colons");
+    assertEquals("testfield:with:colons", actual);
+  }
 
+  @Test
+  public void testNoColons() throws Exception {
+    String actual = new DeDotFieldNameConverter().convert("test-field-no-colons");
+    assertEquals("test-field-no-colons", actual);
+  }
 }
\ No newline at end of file
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java
new file mode 100644
index 0000000..2c263f2
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the {@link FieldNameConverters} class.
+ */
+public class FieldNameConvertersTest {
+
+  private WriterConfiguration createConfig(String writer, String sensor, String json) throws Exception {
+
+    IndexingConfigurations indexingConfig = new IndexingConfigurations();
+    indexingConfig.updateSensorIndexingConfig(sensor, json.getBytes());
+    return new IndexingWriterConfiguration(writer, indexingConfig);
+  }
+
+  /**
+   * {
+   *  "elasticsearch": {
+   *
+   *    "index": "theIndex",
+   *    "batchSize": 100,
+   *    "batchTimeout": 1000,
+   *    "enabled": true,
+   *    "fieldNameConverter": "DEDOT"
+   *  }
+   * }
+   */
+  @Multiline
+  private static String jsonWithDedot;
+
+  /**
+   * The factory should be able to create a {@link DeDotFieldNameConverter}.
+   */
+  @Test
+  public void testCreateDedot() throws Exception {
+
+    final String writer = "elasticsearch";
+    final String sensor = "bro";
+    WriterConfiguration config = createConfig(writer, sensor, jsonWithDedot);
+
+    // validate the converter created for 'bro'
+    FieldNameConverter converter = FieldNameConverters.create(sensor, config);
+    assertEquals(FieldNameConverters.DEDOT, converter);
+  }
+
+  /**
+   * {
+   *  "elasticsearch": {
+   *
+   *    "index": "theIndex",
+   *    "batchSize": 100,
+   *    "batchTimeout": 1000,
+   *    "enabled": true,
+   *    "fieldNameConverter": "NOOP"
+   *  }
+   * }
+   */
+  @Multiline
+  private static String jsonWithNoop;
+
+  /**
+   * The factory should be able to create a {@link NoopFieldNameConverter}.
+   */
+  @Test
+  public void testCreateNoop() throws Exception {
+
+    final String writer = "elasticsearch";
+    final String sensor = "bro";
+    WriterConfiguration config = createConfig(writer, sensor, jsonWithNoop);
+
+    // validate the converter created for 'bro'
+    FieldNameConverter converter = FieldNameConverters.create(sensor, config);
+    assertEquals(FieldNameConverters.NOOP, converter);
+  }
+
+  /**
+   * {
+   *  "elasticsearch": {
+   *
+   *    "index": "theIndex",
+   *    "batchSize": 100,
+   *    "batchTimeout": 1000,
+   *    "enabled": true
+   *  }
+   * }
+   */
+  @Multiline
+  private static String jsonWithNoConverter;
+
+  /**
+   * The factory should create a default {@link FieldNameConverter} if none has been defined
+   * by the user in the writer configuration.
+   */
+  @Test
+  public void testCreateDefault() throws Exception {
+
+    final String writer = "elasticsearch";
+    final String sensor = "bro";
+    WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter);
+
+    // if none defined, should default to 'DEDOT'
+    FieldNameConverter converter = FieldNameConverters.create(sensor, config);
+    assertEquals(FieldNameConverters.DEDOT, converter);
+  }
+
+  /**
+   * If the user changes the {@link FieldNameConverter} in the writer configuration, the new
+   * {@link FieldNameConverter} should be used after the old one expires.
+   */
+  @Test
+  public void testConfigChange() throws Exception {
+
+    final String writer = "elasticsearch";
+    final String sensor = "bro";
+
+    // no converter defined in config, should use 'DEDOT' converter
+    WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter);
+    assertEquals(FieldNameConverters.DEDOT, FieldNameConverters.create(sensor, config));
+
+    // an 'updated' config uses the 'NOOP' converter
+    WriterConfiguration newConfig = createConfig(writer, sensor, jsonWithNoop);
+    assertEquals(FieldNameConverters.NOOP, FieldNameConverters.create(sensor, newConfig));
+  }
+
+  /**
+   * {
+   *  "elasticsearch": {
+   *
+   *    "index": "theIndex",
+   *    "batchSize": 100,
+   *    "batchTimeout": 1000,
+   *    "enabled": true,
+   *    "fieldNameConverter": "INVALID"
+   *  }
+   * }
+   */
+  @Multiline
+  private static String jsonWithInvalidConverter;
+
+  /**
+   * If an invalid field name converter is specified, it should fall-back to using the
+   * default, noop converter.
+   */
+  @Test
+  public void testCreateInvalid() throws Exception {
+
+    final String writer = "elasticsearch";
+    final String sensor = "bro";
+    WriterConfiguration config = createConfig(writer, sensor, jsonWithInvalidConverter);
+
+    // if invalid value defined, it should fall-back to using default 'DEDOT'
+    FieldNameConverter converter = FieldNameConverters.create(sensor, config);
+    assertEquals(FieldNameConverters.DEDOT, converter);
+  }
+
+  /**
+   * {
+   *  "elasticsearch": {
+   *
+   *    "index": "theIndex",
+   *    "batchSize": 100,
+   *    "batchTimeout": 1000,
+   *    "enabled": true,
+   *    "fieldNameConverter": ""
+   *  }
+   * }
+   */
+  @Multiline
+  private static String jsonWithBlankConverter;
+
+  /**
+   * If the field name converter field is blank, it should fall-back to using the
+   * default converter.
+   */
+  @Test
+  public void testCreateBlank() throws Exception {
+
+    final String writer = "elasticsearch";
+    final String sensor = "bro";
+    WriterConfiguration config = createConfig(writer, sensor, jsonWithInvalidConverter);
+
+    // if invalid value defined, it should fall-back to using default 'DEDOT'
+    FieldNameConverter converter = FieldNameConverters.create(sensor, config);
+    assertEquals(FieldNameConverters.DEDOT, converter);
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml
index cf8951f..ae508c6 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -216,6 +216,12 @@
             <artifactId>log4j-core</artifactId>
             <version>${global_log4j_core_version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava-testlib</artifactId>
+            <version>${global_guava_version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java
deleted file mode 100644
index 57e07ea..0000000
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.elasticsearch.writer;
-
-import org.apache.metron.common.interfaces.FieldNameConverter;
-import java.io.Serializable;
-
-public class ElasticsearchFieldNameConverter implements FieldNameConverter, Serializable {
-
-    private static final long serialVersionUID = -3126840090749760299L;
-
-    @Override
-    public String convert(String originalField) {
-        return originalField.replace(".",":");
-    }
-
-}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 5959623..4b8dd08 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,15 +17,10 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.common.field.FieldNameConverter;
+import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
@@ -40,31 +35,53 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link BulkMessageWriter} that writes messages to Elasticsearch.
+ */
 public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The Elasticsearch client.
+   */
   private transient TransportClient client;
+
+  /**
+   * A simple data formatter used to build the appropriate Elasticsearch index name.
+   */
   private SimpleDateFormat dateFormat;
-  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
-  private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter();
+
 
   @Override
   public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
+
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
     client = ElasticsearchUtils.getClient(globalConfiguration);
     dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
   }
 
-
   @Override
   public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+
+    // fetch the field name converter for this sensor type
+    FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
+
     final String indexPostfix = dateFormat.format(new Date());
     BulkRequestBuilder bulkRequest = client.prepareBulk();
-
     for(JSONObject message: messages) {
 
       JSONObject esDoc = new JSONObject();
       for(Object k : message.keySet()){
-        deDot(k.toString(), message, esDoc);
+        copyField(k.toString(), message, esDoc, fieldNameConverter);
       }
 
       String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
@@ -125,19 +142,30 @@
     client.close();
   }
 
+  /**
+   * Copies the value of a field from the source message to the destination message.
+   *
+   * <p>A field name may also be transformed in the destination message by the {@link FieldNameConverter}.
+   *
+   * @param sourceFieldName The name of the field to copy from the source message
+   * @param source The source message.
+   * @param destination The destination message.
+   * @param fieldNameConverter The field name converter that transforms the field name
+   *                           between the source and destination.
+   */
   //JSONObject doesn't expose map generics
   @SuppressWarnings("unchecked")
-  private void deDot(String field, JSONObject origMessage, JSONObject message){
+  private void copyField(
+          String sourceFieldName,
+          JSONObject source,
+          JSONObject destination,
+          FieldNameConverter fieldNameConverter) {
 
-    if(field.contains(".")){
+    // allow the field name to be transformed
+    String destinationFieldName = fieldNameConverter.convert(sourceFieldName);
 
-      LOG.debug("Dotted field: {}", field);
-
-    }
-    String newkey = fieldNameConverter.convert(field);
-    message.put(newkey,origMessage.get(field));
-
+    // copy the field
+    destination.put(destinationFieldName, source.get(sourceFieldName));
   }
-
 }
 
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index e3047b6..df5e96a 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -18,9 +18,10 @@
 package org.apache.metron.elasticsearch.integration;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.common.field.DeDotFieldNameConverter;
+import org.apache.metron.common.field.FieldNameConverter;
+import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
-import org.apache.metron.elasticsearch.writer.ElasticsearchFieldNameConverter;
 import org.apache.metron.indexing.integration.IndexingIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.InMemoryComponent;
@@ -43,7 +44,7 @@
   private String indexDir = "target/elasticsearch";
   private String dateFormat = "yyyy.MM.dd.HH";
   private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
-  private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter();
+  private FieldNameConverter fieldNameConverter = FieldNameConverters.DEDOT;
   /**
    * {
    * "yaf_doc": {
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index f4a4501..a9a8ed9 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -50,6 +50,8 @@
 By default, errors during indexing are sent back into the `indexing` kafka queue so that they can be indexed and archived.
 
 ## Sensor Indexing Configuration
+
+
 The sensor specific configuration is intended to configure the
 indexing used for a given sensor type (e.g. `snort`).  
 
@@ -58,16 +60,16 @@
 * `hdfs`
 * `solr`
 
-Depending on how you start the indexing topology, it will have either
-elasticsearch or solr and hdfs writers running.
+Depending on how you start the indexing topology, it will have either Elasticsearch or Solr and HDFS writers running.
 
-The configuration for an individual writer-specific configuration is a JSON map with the following fields:
-* `index` : The name of the index to write to (defaulted to the name of the sensor).
-* `batchSize` : The size of the batch that is written to the indices at once. Defaults to `1` (no batching).
-* `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met.  Optional.
-If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm 
-parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since this disables batching.
-* `enabled` : Whether the writer is enabled (default `true`).
+| Property             | Description                                                                           | Default Value                                                                                                                                       |
+|----------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| `index`              | The name of the index to write to.                                                    | Defaults to the name of the sensor.                                                                                                                 |
+| `batchSize`          | The size of the batch that is written to the indices at once.                         | Defaults to `1`; no batching.                                                                                                                         |
+| `batchTimeout`       | The timeout after which a batch will be flushed even if `batchSize` has not been met. | Defaults to a duration which is a fraction of the Storm parameter `topology.message.timeout.secs`, if left undefined or set to 0.  Ignored if batchSize is `1`, since this disables batching.|
+| `enabled`            | A boolean indicating whether the writer is enabled.                                   | Defaults to `true`                                                                                                                                    |
+| `fieldNameConverter` | Defines how field names are transformed before being written to the index.  Only applicable to `elasticsearch`.          | Defaults to `DEDOT`.  Acceptable values are `DEDOT` that replaces all '.' with ':' or `NOOP` that does not change the field names . |
+
 
 ### Meta Alerts
 Alerts can be grouped, after appropriate searching, into a set of alerts called a meta alert.  A meta alert is useful for maintaining the context of searching and grouping during further investigations. Standard searches can return meta alerts, but grouping and other aggregation or sorting requests will not, because there's not a clear way to aggregate in many cases if there are multiple alerts contained in the meta alert. All meta alerts will have the source type of metaalert, regardless of the contained alert's origins.
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java
index b8af6a3..2416c86 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java
@@ -18,16 +18,14 @@
 
 package org.apache.metron.indexing.integration;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.integration.*;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.utils.TestUtils;
 import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 1671ab3..2e703f6 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -22,7 +22,7 @@
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.BaseIntegrationTest;
@@ -38,7 +38,6 @@
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
index b1404e2..da884f1 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -301,7 +301,6 @@
       @Override
       public Map<String, Object> getSensorConfig(String sensorName) {
         return sensorConfig;
-
       }
 
       @Override
@@ -313,6 +312,11 @@
       public boolean isDefault(String sensorName) {
         return false;
       }
+
+      @Override
+      public String getFieldNameConverter(String sensorName) {
+        return null;
+      }
     };
   }
 }
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
index 7c907fd..256f23b 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -20,7 +20,7 @@
 import com.google.common.base.Function;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.integration.utils.SampleUtil;
 import org.apache.metron.indexing.integration.IndexingIntegrationTest;
@@ -34,7 +34,6 @@
 import org.apache.metron.solr.integration.components.SolrComponent;
 
 import javax.annotation.Nullable;
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;