| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.metron.elasticsearch.writer; |
| |
| import org.apache.metron.common.Constants; |
| import org.apache.metron.common.configuration.writer.WriterConfiguration; |
| import org.apache.metron.common.writer.BulkMessage; |
| import org.apache.metron.common.writer.BulkWriterResponse; |
| import org.apache.metron.common.writer.MessageId; |
| import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter; |
| import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults; |
| import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; |
| import org.json.simple.JSONObject; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.powermock.core.classloader.annotations.PrepareForTest; |
| import org.powermock.modules.junit4.PowerMockRunner; |
| |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| import static org.powermock.api.mockito.PowerMockito.mockStatic; |
| |
| @RunWith(PowerMockRunner.class) |
| @PrepareForTest({ElasticsearchWriter.class, ElasticsearchUtils.class}) |
| public class ElasticsearchWriterTest { |
| |
| Map stormConf; |
| WriterConfiguration writerConfiguration; |
| |
| @Before |
| public void setup() { |
| writerConfiguration = mock(WriterConfiguration.class); |
| when(writerConfiguration.getGlobalConfig()).thenReturn(globals()); |
| |
| stormConf = new HashMap(); |
| } |
| |
| @Test |
| public void shouldWriteSuccessfully() { |
| // create a message id and a message associated with that id |
| List<BulkMessage<JSONObject>> messages = createMessages(1); |
| |
| // create a document writer which will successfully write all |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addSuccess(createDocument(messages.get(0))); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // response should only contain successes |
| assertFalse(response.hasErrors()); |
| assertTrue(response.getSuccesses().contains(new MessageId("message1"))); |
| } |
| |
| @Test |
| public void shouldWriteManySuccessfully() { |
| // create a few message ids and the messages associated with the ids |
| List<BulkMessage<JSONObject>> messages = createMessages(3); |
| |
| // create a document writer which will successfully write all |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addSuccess(createDocument(messages.get(0))); |
| results.addSuccess(createDocument(messages.get(1))); |
| results.addSuccess(createDocument(messages.get(2))); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // response should only contain successes |
| assertFalse(response.hasErrors()); |
| assertTrue(response.getSuccesses().contains(new MessageId("message1"))); |
| assertTrue(response.getSuccesses().contains(new MessageId("message2"))); |
| assertTrue(response.getSuccesses().contains(new MessageId("message3"))); |
| } |
| |
| @Test |
| public void shouldHandleWriteFailure() { |
| // create a message id and a message associated with that id |
| List<BulkMessage<JSONObject>> messages = createMessages(3); |
| Exception cause = new Exception(); |
| |
| // create a document writer which will fail all writes |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addFailure(createDocument(messages.get(0)), cause, "error"); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // the writer response should only contain failures |
| assertEquals(0, response.getSuccesses().size()); |
| assertEquals(1, response.getErrors().size()); |
| Collection<MessageId> errors = response.getErrors().get(cause); |
| assertTrue(errors.contains(new MessageId("message1"))); |
| } |
| |
| @Test |
| public void shouldHandleManyWriteFailures() { |
| // create a few message ids and the messages associated with the ids |
| int count = 3; |
| List<BulkMessage<JSONObject>> messages = createMessages(count); |
| Exception cause = new Exception(); |
| |
| // create a document writer which will fail all writes |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addFailure(createDocument(messages.get(0)), cause, "error"); |
| results.addFailure(createDocument(messages.get(1)), cause, "error"); |
| results.addFailure(createDocument(messages.get(2)), cause, "error"); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // the writer response should only contain failures |
| assertEquals(0, response.getSuccesses().size()); |
| assertEquals(1, response.getErrors().size()); |
| Collection<MessageId> errors = response.getErrors().get(cause); |
| assertTrue(errors.contains(new MessageId("message1"))); |
| assertTrue(errors.contains(new MessageId("message2"))); |
| assertTrue(errors.contains(new MessageId("message3"))); |
| } |
| |
| @Test |
| public void shouldHandlePartialFailures() { |
| // create a few message ids and the messages associated with the ids |
| int count = 2; |
| List<BulkMessage<JSONObject>> messages = createMessages(count); |
| Exception cause = new Exception(); |
| |
| // create a document writer that will fail one and succeed the other |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addFailure(createDocument(messages.get(0)), cause, "error"); |
| results.addSuccess(createDocument(messages.get(1))); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // response should contain some successes and some failures |
| assertEquals(1, response.getSuccesses().size()); |
| assertEquals(1, response.getErrors().size()); |
| assertTrue(response.getErrors().get(cause).contains(new MessageId("message1"))); |
| assertTrue(response.getSuccesses().contains(new MessageId("message2"))); |
| } |
| |
| @Test |
| public void shouldWriteSuccessfullyWhenMessageTimestampIsString() { |
| List<BulkMessage<JSONObject>> messages = createMessages(1); |
| JSONObject message = messages.get(0).getMessage(); |
| |
| // the timestamp is a String, rather than a Long |
| message.put(Constants.Fields.TIMESTAMP.getName(), new Long(System.currentTimeMillis()).toString()); |
| |
| // create the document |
| |
| String timestamp = (String) message.get(Constants.Fields.TIMESTAMP.getName()); |
| String guid = (String) message.get(Constants.GUID); |
| String sensorType = (String) message.get(Constants.SENSOR_TYPE); |
| MessageIdBasedDocument document = new MessageIdBasedDocument(message, guid, sensorType, Long.parseLong(timestamp), new MessageId("message1")); |
| |
| // create a document writer which will successfully write that document |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addSuccess(document); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // response should only contain successes |
| assertFalse(response.hasErrors()); |
| assertTrue(response.getSuccesses().contains(new MessageId("message1"))); |
| } |
| |
| @Test |
| public void shouldWriteSuccessfullyWhenMissingGUID() { |
| // create a message id and a message associated with that tuple |
| List<BulkMessage<JSONObject>> messages = createMessages(1); |
| |
| // remove the GUID from the message |
| assertNotNull(messages.get(0).getMessage().remove(Constants.GUID)); |
| |
| // create a document writer which will successfully write all |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addSuccess(createDocument(messages.get(0))); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // response should only contain successes |
| assertFalse(response.hasErrors()); |
| assertTrue(response.getSuccesses().contains(new MessageId("message1"))); |
| } |
| |
| @Test |
| public void shouldWriteManySuccessfullyWithSetDocumentId() { |
| when(writerConfiguration.isSetDocumentId("bro")).thenReturn(true); |
| when(writerConfiguration.getFieldNameConverter("bro")).thenReturn("NOOP"); |
| |
| mockStatic(ElasticsearchUtils.class); |
| when(ElasticsearchUtils.getIndexFormat(globals())).thenReturn(new SimpleDateFormat()); |
| when(ElasticsearchUtils.getIndexName(eq("bro"), any(), eq(writerConfiguration))).thenReturn("bro_index"); |
| |
| // create a few message ids and the messages associated with the ids |
| List<BulkMessage<JSONObject>> messages = createMessages(3); |
| |
| // documents should have field converted |
| MessageIdBasedDocument document1 = createDocument(messages.get(0)); |
| MessageIdBasedDocument document2 = createDocument(messages.get(1)); |
| MessageIdBasedDocument document3 = createDocument(messages.get(2)); |
| |
| // documents should have guid as documentID |
| document1.setDocumentID(document1.getGuid()); |
| document2.setDocumentID(document1.getGuid()); |
| document3.setDocumentID(document1.getGuid()); |
| |
| // create a document writer which will successfully write all |
| BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>(); |
| results.addSuccess(document1); |
| results.addSuccess(document2); |
| results.addSuccess(document3); |
| BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class); |
| when(docWriter.write()).thenReturn(results); |
| |
| // attempt to write |
| ElasticsearchWriter esWriter = new ElasticsearchWriter(); |
| esWriter.setDocumentWriter(docWriter); |
| esWriter.init(stormConf, writerConfiguration); |
| BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages); |
| |
| // documents should have metron guid as documentID |
| verify(docWriter, times(1)).addDocument(document1, "bro_index"); |
| verify(docWriter, times(1)).addDocument(document1, "bro_index"); |
| verify(docWriter, times(1)).addDocument(document1, "bro_index"); |
| |
| // response should only contain successes |
| assertFalse(response.hasErrors()); |
| assertTrue(response.getSuccesses().contains(new MessageId("message1"))); |
| assertTrue(response.getSuccesses().contains(new MessageId("message2"))); |
| assertTrue(response.getSuccesses().contains(new MessageId("message3"))); |
| } |
| |
| private MessageIdBasedDocument createDocument(BulkMessage<JSONObject> bulkWriterMessage) { |
| MessageId messageId = bulkWriterMessage.getId(); |
| JSONObject message = bulkWriterMessage.getMessage(); |
| Long timestamp = (Long) bulkWriterMessage.getMessage().get(Constants.Fields.TIMESTAMP.getName()); |
| String guid = (String) message.get(Constants.GUID); |
| String sensorType = (String) message.get(Constants.SENSOR_TYPE); |
| return new MessageIdBasedDocument(message, guid, sensorType, timestamp, messageId); |
| } |
| |
| private JSONObject message() { |
| JSONObject message = new JSONObject(); |
| message.put(Constants.GUID, UUID.randomUUID().toString()); |
| message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis()); |
| message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); |
| message.put(Constants.SENSOR_TYPE, "bro"); |
| return message; |
| } |
| |
| private Map<String, Object> globals() { |
| Map<String, Object> globals = new HashMap<>(); |
| globals.put("es.date.format", "yyyy.MM.dd.HH"); |
| return globals; |
| } |
| |
| private List<BulkMessage<JSONObject>> createMessages(int count) { |
| List<BulkMessage<JSONObject>> messages = new ArrayList<>(); |
| for(int i=0; i<count; i++) { |
| messages.add(new BulkMessage<>(new MessageId("message" + (i + 1)), message())); |
| } |
| return messages; |
| } |
| } |