METRON-2109 Add option to use Metron GUID as the id in Elasticsearch (merrimanr) closes apache/metron#1403
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
index 5144be7..4d19769 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -87,6 +87,7 @@
 | [`es.port`](../metron-elasticsearch#esport)                                                                           | Indexing      | String     | N/A                                     |
 | [`es.date.format`](../metron-elasticsearch#esdateformat)                                                              | Indexing      | String     | `es_date_format`                        |
 | [`es.client.settings`](../metron-elasticsearch#esclientsettings)                                                      | Indexing      | Object     | N/A                                     |
+| [`indexing.writer.elasticsearch.setDocumentId`](../metron-indexing#elasticsearch)                                                        | Indexing      | Boolean    | N/A                                     |
 | [`solr.zookeeper`](../metron-solr#configuration)                                                                      | Indexing      | String     | `solr_zookeeper_url`                    |
 | [`solr.commitPerBatch`](../metron-solr#configuration)                                                                 | Indexing      | String     | N/A                                     |
 | [`solr.commit.soft`](../metron-solr#configuration)                                                                    | Indexing      | String     | N/A                                     |
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index 584bed1..5001767 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -36,6 +36,8 @@
   public static final String INDEX_CONF = "index";
   public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
   public static final String FIELD_NAME_CONVERTER_CONF = "fieldNameConverter";
+  public static final String SET_DOCUMENT_ID_CONF = "setDocumentId";
+  public static final String GLOBAL_ELASTICSEARCH_SET_DOCUMENT_ID_CONF = "indexing.writer.elasticsearch.setDocumentId";
 
   /**
    * Gets the indexing config for a specific sensor.
@@ -184,6 +186,10 @@
     return getFieldNameConverter(getSensorIndexingConfig(sensorName, writerName), sensorName);
   }
 
+  public boolean isSetDocumentId(String sensorName, String writerName) {
+    return isSetDocumentId(getGlobalConfig(true), getSensorIndexingConfig(sensorName, writerName));
+  }
+
   /**
    *  Retrieves the enabled value from the config.
    *
@@ -268,6 +274,17 @@
   }
 
   /**
+   * Determines if the Metron generated id should be used when indexing
+   *
+   * @param globalConf The global config
+   * @param sensorConf The indexing config for a given sensor
+   * @return True if the Metron generated id should be used as the id, False otherwise
+   */
+  public static boolean isSetDocumentId(Map<String, Object> globalConf, Map<String, Object> sensorConf) {
+    return getAs(SET_DOCUMENT_ID_CONF, sensorConf, getAs(GLOBAL_ELASTICSEARCH_SET_DOCUMENT_ID_CONF, globalConf, false, Boolean.class), Boolean.class);
+  }
+
+  /**
    * Sets the enabled flag in the config.
    *
    * @param conf The configuration map to set enabled in. If null replaced with empty map.
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
index ab25a80..fbd1178 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
@@ -77,4 +77,9 @@
   public String getFieldNameConverter(String sensorName) {
     return config.orElse(new IndexingConfigurations()).getFieldNameConverter(sensorName, writerName);
   }
+
+  @Override
+  public boolean isSetDocumentId(String sensorName) {
+    return config.orElse(new IndexingConfigurations()).isSetDocumentId(sensorName, writerName);
+  }
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
index e75a65d..5c932da 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
@@ -103,4 +103,13 @@
    * @return The {@link FieldNameConverter}
    */
   String getFieldNameConverter(String sensorName);
+
+  /**
+   * Returns true, if the current writer configuration is set to use the GUID generated by Metron as the id
+   * @param sensorName The name of the sensor.
+   * @return True, if writer is configured to use GUID generated by Metron, false otherwise (and by default)
+   */
+  default boolean isSetDocumentId(String sensorName) {
+    return false;
+  }
 }
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/IndexingConfigurationsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/IndexingConfigurationsTest.java
new file mode 100644
index 0000000..f561423
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/IndexingConfigurationsTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IndexingConfigurationsTest {
+
+  /**
+   * {
+   *  "indexing.writer.elasticsearch.setDocumentId" : "true"
+   * }
+   */
+  @Multiline
+  private static String globalConfig;
+
+  /**
+   * {
+   *  "writer" : {
+   *    "setDocumentId": true
+   *  }
+   * }
+   */
+  @Multiline
+  private static String sensorConfig;
+
+  private IndexingConfigurations configurations;
+
+  @Before
+  public void setup() {
+    configurations = new IndexingConfigurations();
+  }
+
+  @Test
+  public void shouldReturnDefaultSetDocumentId() throws Exception {
+    // verify false by default
+    assertFalse(configurations.isSetDocumentId("sensor", "writer"));
+  }
+
+  @Test
+  public void shouldReturnGlobalSetDocumentId() throws Exception {
+    // verify global config setting applies to any sensor
+    configurations.updateGlobalConfig(globalConfig.getBytes(StandardCharsets.UTF_8));
+
+    assertTrue(configurations.isSetDocumentId("sensor", "writer"));
+    assertTrue(configurations.isSetDocumentId("anySensor", "writer"));
+  }
+
+  @Test
+  public void shouldReturnSensorSetDocumentId() throws Exception {
+    // verify sensor config only applies to that sensor
+    configurations.updateGlobalConfig(new HashMap<>());
+    configurations.updateSensorIndexingConfig("sensor", sensorConfig.getBytes(StandardCharsets.UTF_8));
+
+    assertTrue(configurations.isSetDocumentId("sensor", "writer"));
+    assertFalse(configurations.isSetDocumentId("anySensor", "writer"));
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
index bde5664..e753711 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
@@ -130,7 +130,7 @@
         // if creating a new document, set the doc ID to null to allow Elasticsearch to generate one.
         String docId = document.getDocumentID().orElse(null);
         if(LOG.isDebugEnabled() && document.getDocumentID().isPresent()) {
-            LOG.debug("Updating existing document with known doc ID; docID={}, guid={}, sensorType={}",
+            LOG.debug("Creating/Updating a document with known doc ID; docID={}, guid={}, sensorType={}",
                     docId, document.getGuid(), document.getSensorType());
         } else if(LOG.isDebugEnabled()) {
             LOG.debug("Creating a new document, doc ID not yet known; guid={}, sensorType={}",
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index b7814b6..ef6577f 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -97,7 +97,7 @@
 
     // create a document from each message
     for(BulkMessage<JSONObject> bulkWriterMessage: messages) {
-      MessageIdBasedDocument document = createDocument(bulkWriterMessage, sensorType, fieldNameConverter);
+      MessageIdBasedDocument document = createDocument(bulkWriterMessage, sensorType, fieldNameConverter, configurations.isSetDocumentId(sensorType));
       documentWriter.addDocument(document, indexName);
     }
 
@@ -117,7 +117,8 @@
 
   private MessageIdBasedDocument createDocument(BulkMessage<JSONObject> bulkWriterMessage,
                                                 String sensorType,
-                                                FieldNameConverter fieldNameConverter) {
+                                                FieldNameConverter fieldNameConverter,
+                                                boolean setDocumentId) {
     // transform the message fields to the source fields of the indexed document
     JSONObject source = new JSONObject();
     JSONObject message = bulkWriterMessage.getMessage();
@@ -139,8 +140,12 @@
     } else {
       LOG.warn("Missing '{}' field; timestamp will be set to system time.", TIMESTAMP.getName());
     }
-
-    return new MessageIdBasedDocument(source, guid, sensorType, timestamp, bulkWriterMessage.getId());
+    MessageIdBasedDocument messageIdBasedDocument = new MessageIdBasedDocument(source, guid, sensorType, timestamp, bulkWriterMessage.getId());
+    if (setDocumentId) {
+      // Use the metron-generated GUID instead of letting Elasticsearch set the id
+      messageIdBasedDocument.setDocumentID(guid);
+    }
+    return messageIdBasedDocument;
   }
 
   @Override
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 7036078..705beda 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -145,7 +145,11 @@
     for (Object broObject: (JSONArray) new JSONParser().parse(broData)) {
       broDocuments.add(((JSONObject) broObject).toJSONString());
     }
-    es.add(BRO_INDEX, "bro", broDocuments);
+    // add documents using Metron GUID
+    es.add(BRO_INDEX, "bro", broDocuments.subList(0, 4), true);
+
+    // add a document to the same index but with an Elasticsearch id
+    es.add(BRO_INDEX, "bro", broDocuments.subList(4, 5), false);
 
     // write the test documents for Snort
     List<String> snortDocuments = new ArrayList<>();
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index dfdf88e..18d947d 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -217,10 +217,14 @@
           throws IOException, ParseException {
     List<String> d = new ArrayList<>();
     Collections.addAll(d, docs);
-    add(indexName, sensorType, d);
+    add(indexName, sensorType, d, false);
   }
 
-  public void add(String indexName, String sensorType, Iterable<String> docs)
+  public void add(String indexName, String sensorType, Iterable<String> docs) throws IOException, ParseException {
+    add(indexName, sensorType, docs, false);
+  }
+
+  public void add(String indexName, String sensorType, Iterable<String> docs, boolean setDocumentId)
           throws IOException, ParseException {
 
     // create a collection of indexable documents
@@ -228,7 +232,7 @@
     Map<Document, Optional<String>> documents = new HashMap<>();
     for(String json: docs) {
       JSONObject message = (JSONObject) parser.parse(json);
-      documents.put(createDocument(message, sensorType), Optional.of(indexName));
+      documents.put(createDocument(message, sensorType, setDocumentId), Optional.of(indexName));
     }
 
     // write the documents
@@ -243,11 +247,15 @@
    * @return The {@link Document} that was written.
    * @throws IOException
    */
-  private static Document createDocument(JSONObject message, String docType) throws IOException {
+  private static Document createDocument(JSONObject message, String docType, boolean setDocumentId) throws IOException {
     Long timestamp = ConversionUtils.convert(message.get("timestamp"), Long.class);
     String source = message.toJSONString();
     String guid = (String) message.get("guid");
-    return new Document(source, guid, docType, timestamp);
+    Document document = new Document(source, guid, docType, timestamp);
+    if (setDocumentId) {
+      document.setDocumentID(guid);
+    }
+    return document;
   }
 
   public void createIndexWithMapping(String indexName, String mappingType, String mappingSource)
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index ba5cfe0..2d5fd2a 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -25,10 +25,15 @@
 import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
 import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,9 +45,16 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ElasticsearchWriter.class, ElasticsearchUtils.class})
 public class ElasticsearchWriterTest {
 
     Map stormConf;
@@ -243,6 +255,54 @@
         assertTrue(response.getSuccesses().contains(new MessageId("message1")));
     }
 
+    @Test
+    public void shouldWriteManySuccessfullyWithSetDocumentId() {
+        when(writerConfiguration.isSetDocumentId("bro")).thenReturn(true);
+        when(writerConfiguration.getFieldNameConverter("bro")).thenReturn("NOOP");
+
+        mockStatic(ElasticsearchUtils.class);
+        when(ElasticsearchUtils.getIndexFormat(globals())).thenReturn(new SimpleDateFormat());
+        when(ElasticsearchUtils.getIndexName(eq("bro"), any(), eq(writerConfiguration))).thenReturn("bro_index");
+
+        // create a few message ids and the messages associated with the ids
+        List<BulkMessage<JSONObject>> messages = createMessages(3);
+
+        // documents should have field converted
+        MessageIdBasedDocument document1 = createDocument(messages.get(0));
+        MessageIdBasedDocument document2 = createDocument(messages.get(1));
+        MessageIdBasedDocument document3 = createDocument(messages.get(2));
+
+        // documents should have guid as documentID
+        document1.setDocumentID(document1.getGuid());
+        document2.setDocumentID(document1.getGuid());
+        document3.setDocumentID(document1.getGuid());
+
+        // create a document writer which will successfully write all
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(document1);
+        results.addSuccess(document2);
+        results.addSuccess(document3);
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
+
+        // documents should have metron guid as documentID
+        verify(docWriter, times(1)).addDocument(document1, "bro_index");
+        verify(docWriter, times(1)).addDocument(document1, "bro_index");
+        verify(docWriter, times(1)).addDocument(document1, "bro_index");
+
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(new MessageId("message1")));
+        assertTrue(response.getSuccesses().contains(new MessageId("message2")));
+        assertTrue(response.getSuccesses().contains(new MessageId("message3")));
+    }
+
     private MessageIdBasedDocument createDocument(BulkMessage<JSONObject> bulkWriterMessage) {
         MessageId messageId = bulkWriterMessage.getId();
         JSONObject message = bulkWriterMessage.getMessage();
@@ -257,7 +317,7 @@
         message.put(Constants.GUID, UUID.randomUUID().toString());
         message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
         message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
-        message.put(Constants.SENSOR_TYPE, "sensor");
+        message.put(Constants.SENSOR_TYPE, "bro");
         return message;
     }
 
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index 46e511b..990a5a1 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -69,13 +69,29 @@
 | `batchTimeout`       | The timeout after which a batch will be flushed even if `batchSize` has not been met. | Defaults to a duration which is a fraction of the Storm parameter `topology.message.timeout.secs`, if left undefined or set to 0.  Ignored if batchSize is `1`, since this disables batching.|
 | `enabled`            | A boolean indicating whether the writer is enabled.                                   | Defaults to `true`                                                                                                                                    |
 | `fieldNameConverter` | Defines how field names are transformed before being written to the index.  Only applicable to `elasticsearch`.          | Defaults to `DEDOT`.  Acceptable values are `DEDOT` that replaces all '.' with ':' or `NOOP` that does not change the field names . |
+| `setDocumentId`      | A boolean indicating whether the writer should use the document id generated by Metron| Defaults to `false`.  This setting only applies to Elasticsearch, the id used with Solr is configured in the Solr schemas.
 
 
 ### Meta Alerts
 Alerts can be grouped, after appropriate searching, into a set of alerts called a meta alert.  A meta alert is useful for maintaining the context of searching and grouping during further investigations. Standard searches can return meta alerts, but grouping and other aggregation or sorting requests will not, because there's not a clear way to aggregate in many cases if there are multiple alerts contained in the meta alert. All meta alerts will have the source type of metaalert, regardless of the contained alert's origins.
 
 ### Elasticsearch
-Metron comes with built-in templates for the default sensors for Elasticsearch. When adding a new sensor, it will be necessary to add a new template defining the output fields appropriately. In addition, there is a requirement for a field `alert` of type `nested` for Elasticsearch 2.x installs.  This is detailed at [Using Metron with Elasticsearch 2.x](../metron-elasticsearch/README.md#using-metron-with-elasticsearch-2x)
+Metron comes with built-in templates for the default sensors for Elasticsearch. When adding a new sensor, it will be necessary to add a new template defining the output fields appropriately. In addition, there is a requirement for a field `alert` of type `nested` for Elasticsearch 2.x installs.  This is detailed at [Using Metron with Elasticsearch 2.x](../metron-elasticsearch/README.md#using-metron-with-elasticsearch-2x).
+
+Metron is configured by default to let Elasticsearch [use auto-generated ids](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/tune-for-indexing-speed.html#_use_auto_generated_ids) for performance reasons.  However, due to Storm's at least once processing guarantee, it is possible for duplicate messages to be indexed when messages are replayed for whatever reason.  If this scenario is less desirable, the Metron generated id stored in the `guid` field of the message can be used instead.  This can be configured for individual sensors by setting the `setDocumentId` setting to true in the [Sensor Indexing Configuration](#sensor-indexing-configuration):
+```
+{
+  "elasticsearch": {
+    "enabled": true,
+    "index": "bro",
+    "setDocumentId": true
+  }
+}
+```
+This can also be set for all sensors in the [Global Configuration](../metron-common#global-configuration) by setting the `indexing.writer.elasticsearch.setDocumentId` setting to true.  These settings are applied in this order of precedence (highest to lowest):
+1. Sensor indexing configuration
+2. Global configuration
+3. False by default
 
 ### Solr